Compare commits
6 Commits
server
...
e7f4de9ec1
| Author | SHA1 | Date | |
|---|---|---|---|
| e7f4de9ec1 | |||
| 9cf9e3233f | |||
| e17d8f67aa | |||
| b1980de6d1 | |||
| 85e0641440 | |||
| 834b89b682 |
@@ -0,0 +1,114 @@
|
|||||||
|
"""Audio similarity scanning — MFCC-based profile matching."""
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import librosa
|
||||||
|
|
||||||
|
from .paths import _log
|
||||||
|
|
||||||
|
_N_MFCC = 20
|
||||||
|
_SR = 22050
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_mfcc(path: str, sr: int = _SR) -> np.ndarray:
|
||||||
|
"""Load audio from a file and return a mean MFCC vector (20-dim)."""
|
||||||
|
y, _ = librosa.load(path, sr=sr, mono=True)
|
||||||
|
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=_N_MFCC)
|
||||||
|
return mfcc.mean(axis=1) # average over time → (20,)
|
||||||
|
|
||||||
|
|
||||||
|
def build_profile(clip_paths: list[str]) -> dict | None:
|
||||||
|
"""Extract MFCCs from reference clips.
|
||||||
|
|
||||||
|
Returns dict with:
|
||||||
|
- mean_vector: averaged MFCC across all clips (20,)
|
||||||
|
- clip_vectors: list of individual MFCC vectors
|
||||||
|
Returns None if no clips could be loaded.
|
||||||
|
"""
|
||||||
|
vectors = []
|
||||||
|
for p in clip_paths:
|
||||||
|
try:
|
||||||
|
vec = _extract_mfcc(p)
|
||||||
|
vectors.append(vec)
|
||||||
|
except Exception as e:
|
||||||
|
_log(f"audio_scan: skip {p}: {e}")
|
||||||
|
if not vectors:
|
||||||
|
return None
|
||||||
|
arr = np.stack(vectors)
|
||||||
|
return {
|
||||||
|
"mean_vector": arr.mean(axis=0),
|
||||||
|
"clip_vectors": vectors,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
||||||
|
"""Cosine similarity between two vectors.
|
||||||
|
|
||||||
|
Returns value in [-1, 1]. Negative means anti-correlated (very
|
||||||
|
dissimilar). For threshold filtering this is fine — negative scores
|
||||||
|
never exceed the threshold. Scores near 0 may be uncorrelated or
|
||||||
|
weakly anti-correlated.
|
||||||
|
"""
|
||||||
|
na = np.linalg.norm(a)
|
||||||
|
nb = np.linalg.norm(b)
|
||||||
|
if na == 0 or nb == 0:
|
||||||
|
return 0.0
|
||||||
|
return float(np.dot(a, b) / (na * nb))
|
||||||
|
|
||||||
|
|
||||||
|
def scan_video(
|
||||||
|
video_path: str,
|
||||||
|
profile: dict,
|
||||||
|
mode: str = "average",
|
||||||
|
threshold: float = 0.7,
|
||||||
|
hop: float = 1.0,
|
||||||
|
window: float = 8.0,
|
||||||
|
cancel_flag: object = None,
|
||||||
|
) -> list[tuple[float, float, float]]:
|
||||||
|
"""Slide a window across the video audio and score against the profile.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
video_path: path to video/audio file
|
||||||
|
profile: dict from build_profile()
|
||||||
|
mode: "average" (compare to mean) or "nearest" (max over all clips)
|
||||||
|
threshold: minimum cosine similarity to include
|
||||||
|
hop: step size in seconds
|
||||||
|
window: window size in seconds (default 8s)
|
||||||
|
cancel_flag: object with _cancel bool attribute; checked each iteration
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of (start_time, end_time, score) for regions above threshold
|
||||||
|
"""
|
||||||
|
_log(f"audio_scan: loading {video_path}")
|
||||||
|
y, sr = librosa.load(video_path, sr=_SR, mono=True)
|
||||||
|
duration = len(y) / sr
|
||||||
|
_log(f"audio_scan: {duration:.1f}s loaded, scanning with hop={hop}s")
|
||||||
|
|
||||||
|
win_samples = int(window * sr)
|
||||||
|
hop_samples = int(hop * sr)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
pos = 0
|
||||||
|
while pos + win_samples <= len(y):
|
||||||
|
if cancel_flag and getattr(cancel_flag, '_cancel', False):
|
||||||
|
_log("audio_scan: cancelled")
|
||||||
|
return results
|
||||||
|
|
||||||
|
chunk = y[pos : pos + win_samples]
|
||||||
|
mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC)
|
||||||
|
vec = mfcc.mean(axis=1)
|
||||||
|
|
||||||
|
if mode == "nearest":
|
||||||
|
score = max(
|
||||||
|
_cosine_similarity(vec, cv) for cv in profile["clip_vectors"]
|
||||||
|
)
|
||||||
|
else: # average
|
||||||
|
score = _cosine_similarity(vec, profile["mean_vector"])
|
||||||
|
|
||||||
|
if score >= threshold:
|
||||||
|
start_t = pos / sr
|
||||||
|
results.append((start_t, start_t + window, score))
|
||||||
|
|
||||||
|
pos += hop_samples
|
||||||
|
|
||||||
|
_log(f"audio_scan: {len(results)} regions above threshold {threshold}")
|
||||||
|
return results
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
# Audio Similarity Scanning — Design
|
||||||
|
|
||||||
|
**Goal:** Scan a video's audio track and highlight segments that match the sound profile of existing reference clips, so the user can quickly find similar moments without scrubbing manually.
|
||||||
|
|
||||||
|
**Runs in:** Python/Qt client (`main.py`), not the server.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Core Module: `core/audio_scan.py`
|
||||||
|
|
||||||
|
New module alongside `core/tracking.py`. Two main functions:
|
||||||
|
|
||||||
|
- `build_profile(clip_paths: list[str]) -> dict` — extracts MFCCs (20 coefficients) from each clip using `librosa`, returns a profile containing both the averaged vector and individual clip vectors.
|
||||||
|
- `scan_video(video_path: str, profile: dict, mode: str, threshold: float, hop: float) -> list[tuple[float, float, float]]` — slides an 8s window across the video's audio, returns `(start_time, end_time, score)` tuples for segments above threshold.
|
||||||
|
|
||||||
|
### Feature Extraction
|
||||||
|
|
||||||
|
- Audio loaded via `librosa.load()` (handles video files directly, mono, 22050Hz).
|
||||||
|
- MFCCs: `librosa.feature.mfcc(n_mfcc=20)`, averaged over time axis to produce a single vector per window/clip.
|
||||||
|
- Similarity: cosine similarity (`numpy` dot product on L2-normalized vectors).
|
||||||
|
|
||||||
|
### Matching Modes
|
||||||
|
|
||||||
|
- **Average mode:** Compare each window to the mean of all reference MFCC vectors. Fast, good when references are homogeneous.
|
||||||
|
- **Nearest mode:** Compare each window to every reference vector, take the max score. Better when references have variety within the style.
|
||||||
|
|
||||||
|
### Parameters
|
||||||
|
|
||||||
|
- `threshold` (float, 0.0–1.0): minimum cosine similarity to include a segment. Default 0.7.
|
||||||
|
- `hop` (float, seconds): step size for the sliding window. Default 1.0s.
|
||||||
|
- Window size fixed at 8s to match reference clip length.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## UI Integration in `main.py`
|
||||||
|
|
||||||
|
### Controls
|
||||||
|
|
||||||
|
Added near the existing tracking checkbox area:
|
||||||
|
|
||||||
|
- **"Scan" button** — triggers audio scan on current video.
|
||||||
|
- **Threshold slider** (0.0–1.0, step 0.05) — controls match strictness.
|
||||||
|
- **Mode combobox** — "Average" / "Nearest".
|
||||||
|
- **Reference source combobox** — "Current Profile" / "Custom Folder" (shows folder picker when "Custom Folder" selected).
|
||||||
|
|
||||||
|
### Scan Workflow
|
||||||
|
|
||||||
|
1. User clicks Scan.
|
||||||
|
2. Reference clips collected: either all export `output_path` values from the current profile (via DB) or all audio/video files in a custom folder.
|
||||||
|
3. Scan runs in a `QThread` so UI stays responsive.
|
||||||
|
4. On completion, results sent to Timeline widget via signal.
|
||||||
|
|
||||||
|
### Timeline Display
|
||||||
|
|
||||||
|
- New `set_scan_regions(regions: list[tuple[float, float, float]])` method on Timeline.
|
||||||
|
- Drawn as semi-transparent colored rectangles behind existing markers.
|
||||||
|
- Color intensity proportional to score (brighter = higher match).
|
||||||
|
- Cleared on file change or re-scan.
|
||||||
|
|
||||||
|
### Keyboard Shortcut
|
||||||
|
|
||||||
|
- `S` — jump cursor to the next scan region (similar to `M` for next marker).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Data Flow
|
||||||
|
|
||||||
|
```
|
||||||
|
Reference clips (DB export paths or folder)
|
||||||
|
|
|
||||||
|
librosa.load() each -> MFCC vectors (20-dim)
|
||||||
|
|
|
||||||
|
Profile: { mean_vector, clip_vectors[] }
|
||||||
|
|
|
||||||
|
Current video -> librosa.load() full audio (mono 22050Hz)
|
||||||
|
|
|
||||||
|
Sliding 8s window (hop=1s) -> MFCC per window
|
||||||
|
|
|
||||||
|
Cosine similarity vs profile -> score per position
|
||||||
|
|
|
||||||
|
Threshold filter -> [(start, end, score), ...]
|
||||||
|
|
|
||||||
|
Timeline: semi-transparent highlight regions
|
||||||
|
```
|
||||||
|
|
||||||
|
## Performance
|
||||||
|
|
||||||
|
- 2-hour video at 22050Hz mono ~ 380MB memory.
|
||||||
|
- MFCC extraction + sliding window: ~10-30s.
|
||||||
|
- QThread keeps UI responsive.
|
||||||
|
|
||||||
|
## What This Does NOT Do
|
||||||
|
|
||||||
|
- No DB schema changes — scan results are ephemeral (visual only).
|
||||||
|
- No auto-export — user decides what to cut.
|
||||||
|
- No server integration — runs entirely in the Python client.
|
||||||
|
- No GPU/ML model dependency — just librosa + numpy.
|
||||||
@@ -0,0 +1,739 @@
|
|||||||
|
# Audio Similarity Scanning — Implementation Plan
|
||||||
|
|
||||||
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||||
|
|
||||||
|
**Goal:** Scan a video's audio track to find segments matching a reference sound profile, displayed as highlighted regions on the timeline.
|
||||||
|
|
||||||
|
**Architecture:** New `core/audio_scan.py` module extracts MFCC features from reference clips and slides an 8s window across the target video's audio, scoring each position via cosine similarity. A `ScanWorker` QThread runs the scan in the background, and results are drawn as semi-transparent rectangles on the existing Timeline widget.
|
||||||
|
|
||||||
|
**Tech Stack:** Python 3, librosa 0.11, numpy, PyQt6
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Core audio_scan module — build_profile
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `core/audio_scan.py`
|
||||||
|
- Create: `tests/test_audio_scan.py`
|
||||||
|
|
||||||
|
**Step 1: Write the tests**
|
||||||
|
|
||||||
|
```python
|
||||||
|
# tests/test_audio_scan.py
|
||||||
|
import tempfile, os
|
||||||
|
import numpy as np
|
||||||
|
from core.audio_scan import build_profile, _extract_mfcc
|
||||||
|
|
||||||
|
|
||||||
|
def _make_wav(path: str, duration: float = 8.0, sr: int = 22050):
|
||||||
|
"""Create a short sine-wave WAV file for testing."""
|
||||||
|
import soundfile as sf
|
||||||
|
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
|
||||||
|
audio = 0.5 * np.sin(2 * np.pi * 440 * t)
|
||||||
|
sf.write(path, audio, sr)
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_mfcc_returns_1d_vector():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
||||||
|
_make_wav(f.name)
|
||||||
|
try:
|
||||||
|
vec = _extract_mfcc(f.name)
|
||||||
|
assert vec.shape == (20,)
|
||||||
|
assert not np.isnan(vec).any()
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_single_clip():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
||||||
|
_make_wav(f.name)
|
||||||
|
try:
|
||||||
|
profile = build_profile([f.name])
|
||||||
|
assert "mean_vector" in profile
|
||||||
|
assert "clip_vectors" in profile
|
||||||
|
assert profile["mean_vector"].shape == (20,)
|
||||||
|
assert len(profile["clip_vectors"]) == 1
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_multiple_clips():
|
||||||
|
paths = []
|
||||||
|
try:
|
||||||
|
for i in range(3):
|
||||||
|
f = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
||||||
|
freq = 440 + i * 200
|
||||||
|
import soundfile as sf
|
||||||
|
t = np.linspace(0, 8.0, 22050 * 8, endpoint=False)
|
||||||
|
sf.write(f.name, 0.5 * np.sin(2 * np.pi * freq * t), 22050)
|
||||||
|
paths.append(f.name)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
profile = build_profile(paths)
|
||||||
|
assert len(profile["clip_vectors"]) == 3
|
||||||
|
assert profile["mean_vector"].shape == (20,)
|
||||||
|
finally:
|
||||||
|
for p in paths:
|
||||||
|
os.unlink(p)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_skips_missing_files():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
||||||
|
_make_wav(f.name)
|
||||||
|
try:
|
||||||
|
profile = build_profile([f.name, "/no/such/file.wav"])
|
||||||
|
assert len(profile["clip_vectors"]) == 1
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_empty_returns_none():
|
||||||
|
result = build_profile([])
|
||||||
|
assert result is None
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run tests to verify they fail**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py -v`
|
||||||
|
Expected: FAIL with `ModuleNotFoundError: No module named 'core.audio_scan'`
|
||||||
|
|
||||||
|
**Step 3: Write the implementation**
|
||||||
|
|
||||||
|
```python
|
||||||
|
# core/audio_scan.py
|
||||||
|
"""Audio similarity scanning — MFCC-based profile matching."""
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import librosa
|
||||||
|
|
||||||
|
from .paths import _log
|
||||||
|
|
||||||
|
_N_MFCC = 20
|
||||||
|
_SR = 22050
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_mfcc(path: str, sr: int = _SR) -> np.ndarray:
|
||||||
|
"""Load audio from a file and return a mean MFCC vector (20-dim)."""
|
||||||
|
y, _ = librosa.load(path, sr=sr, mono=True)
|
||||||
|
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=_N_MFCC)
|
||||||
|
return mfcc.mean(axis=1) # average over time → (20,)
|
||||||
|
|
||||||
|
|
||||||
|
def build_profile(clip_paths: list[str]) -> dict | None:
|
||||||
|
"""Extract MFCCs from reference clips.
|
||||||
|
|
||||||
|
Returns dict with:
|
||||||
|
- mean_vector: averaged MFCC across all clips (20,)
|
||||||
|
- clip_vectors: list of individual MFCC vectors
|
||||||
|
Returns None if no clips could be loaded.
|
||||||
|
"""
|
||||||
|
vectors = []
|
||||||
|
for p in clip_paths:
|
||||||
|
try:
|
||||||
|
vec = _extract_mfcc(p)
|
||||||
|
vectors.append(vec)
|
||||||
|
except Exception as e:
|
||||||
|
_log(f"audio_scan: skip {p}: {e}")
|
||||||
|
if not vectors:
|
||||||
|
return None
|
||||||
|
arr = np.stack(vectors)
|
||||||
|
return {
|
||||||
|
"mean_vector": arr.mean(axis=0),
|
||||||
|
"clip_vectors": vectors,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run tests to verify they pass**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py -v`
|
||||||
|
Expected: all 5 PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add core/audio_scan.py tests/test_audio_scan.py
|
||||||
|
git commit -m "feat: add audio_scan module with build_profile"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: Core audio_scan module — scan_video
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `core/audio_scan.py`
|
||||||
|
- Modify: `tests/test_audio_scan.py`
|
||||||
|
|
||||||
|
**Step 1: Write the tests**
|
||||||
|
|
||||||
|
Add to `tests/test_audio_scan.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from core.audio_scan import scan_video
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_video_finds_matching_region():
|
||||||
|
"""A video made of the same sine wave as the reference should match."""
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
|
||||||
|
_make_wav(ref.name, duration=8.0)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
|
||||||
|
_make_wav(vid.name, duration=20.0)
|
||||||
|
try:
|
||||||
|
profile = build_profile([ref.name])
|
||||||
|
regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0)
|
||||||
|
assert len(regions) > 0
|
||||||
|
for start, end, score in regions:
|
||||||
|
assert abs((end - start) - 8.0) < 1e-9
|
||||||
|
assert score >= 0.5
|
||||||
|
assert score >= 0.5
|
||||||
|
finally:
|
||||||
|
os.unlink(ref.name)
|
||||||
|
os.unlink(vid.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_video_nearest_mode():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
|
||||||
|
_make_wav(ref.name, duration=8.0)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
|
||||||
|
_make_wav(vid.name, duration=20.0)
|
||||||
|
try:
|
||||||
|
profile = build_profile([ref.name])
|
||||||
|
regions = scan_video(vid.name, profile, mode="nearest", threshold=0.5, hop=1.0)
|
||||||
|
assert len(regions) > 0
|
||||||
|
finally:
|
||||||
|
os.unlink(ref.name)
|
||||||
|
os.unlink(vid.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_video_high_threshold_no_match():
|
||||||
|
"""Different frequencies with very high threshold should not match."""
|
||||||
|
import soundfile as sf
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
|
||||||
|
t = np.linspace(0, 8.0, 22050 * 8, endpoint=False)
|
||||||
|
sf.write(ref.name, 0.5 * np.sin(2 * np.pi * 440 * t), 22050)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
|
||||||
|
# White noise — very different from sine wave
|
||||||
|
sf.write(vid.name, np.random.randn(22050 * 20).astype(np.float32) * 0.1, 22050)
|
||||||
|
try:
|
||||||
|
profile = build_profile([ref.name])
|
||||||
|
regions = scan_video(vid.name, profile, mode="average", threshold=0.99, hop=1.0)
|
||||||
|
assert len(regions) == 0
|
||||||
|
finally:
|
||||||
|
os.unlink(ref.name)
|
||||||
|
os.unlink(vid.name)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run tests to verify they fail**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_scan_video_finds_matching_region -v`
|
||||||
|
Expected: FAIL with `ImportError: cannot import name 'scan_video'`
|
||||||
|
|
||||||
|
**Step 3: Write the implementation**
|
||||||
|
|
||||||
|
Add to `core/audio_scan.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
||||||
|
"""Cosine similarity between two vectors.
|
||||||
|
|
||||||
|
Returns value in [-1, 1]. Negative means anti-correlated (very
|
||||||
|
dissimilar). For threshold filtering this is fine — negative scores
|
||||||
|
never exceed the threshold. Scores near 0 may be uncorrelated or
|
||||||
|
weakly anti-correlated.
|
||||||
|
"""
|
||||||
|
na = np.linalg.norm(a)
|
||||||
|
nb = np.linalg.norm(b)
|
||||||
|
if na == 0 or nb == 0:
|
||||||
|
return 0.0
|
||||||
|
return float(np.dot(a, b) / (na * nb))
|
||||||
|
|
||||||
|
|
||||||
|
def scan_video(
|
||||||
|
video_path: str,
|
||||||
|
profile: dict,
|
||||||
|
mode: str = "average",
|
||||||
|
threshold: float = 0.7,
|
||||||
|
hop: float = 1.0,
|
||||||
|
window: float = 8.0,
|
||||||
|
cancel_flag: object = None,
|
||||||
|
) -> list[tuple[float, float, float]]:
|
||||||
|
"""Slide a window across the video audio and score against the profile.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
video_path: path to video/audio file
|
||||||
|
profile: dict from build_profile()
|
||||||
|
mode: "average" (compare to mean) or "nearest" (max over all clips)
|
||||||
|
threshold: minimum cosine similarity to include
|
||||||
|
hop: step size in seconds
|
||||||
|
window: window size in seconds (default 8s)
|
||||||
|
cancel_flag: object with _cancel bool attribute; checked each iteration
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of (start_time, end_time, score) for regions above threshold
|
||||||
|
"""
|
||||||
|
_log(f"audio_scan: loading {video_path}")
|
||||||
|
y, sr = librosa.load(video_path, sr=_SR, mono=True)
|
||||||
|
duration = len(y) / sr
|
||||||
|
_log(f"audio_scan: {duration:.1f}s loaded, scanning with hop={hop}s")
|
||||||
|
|
||||||
|
win_samples = int(window * sr)
|
||||||
|
hop_samples = int(hop * sr)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
pos = 0
|
||||||
|
while pos + win_samples <= len(y):
|
||||||
|
if cancel_flag and getattr(cancel_flag, '_cancel', False):
|
||||||
|
_log("audio_scan: cancelled")
|
||||||
|
return results
|
||||||
|
|
||||||
|
chunk = y[pos : pos + win_samples]
|
||||||
|
mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC)
|
||||||
|
vec = mfcc.mean(axis=1)
|
||||||
|
|
||||||
|
if mode == "nearest":
|
||||||
|
score = max(
|
||||||
|
_cosine_similarity(vec, cv) for cv in profile["clip_vectors"]
|
||||||
|
)
|
||||||
|
else: # average
|
||||||
|
score = _cosine_similarity(vec, profile["mean_vector"])
|
||||||
|
|
||||||
|
if score >= threshold:
|
||||||
|
start_t = pos / sr
|
||||||
|
results.append((start_t, start_t + window, score))
|
||||||
|
|
||||||
|
pos += hop_samples
|
||||||
|
|
||||||
|
_log(f"audio_scan: {len(results)} regions above threshold {threshold}")
|
||||||
|
return results
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run tests to verify they pass**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py -v`
|
||||||
|
Expected: all 8 PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add core/audio_scan.py tests/test_audio_scan.py
|
||||||
|
git commit -m "feat: add scan_video with average and nearest modes"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: Timeline — draw scan regions
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `main.py` (Timeline class, around lines 209-260 and 300-375)
|
||||||
|
|
||||||
|
**Step 1: Add scan region storage to Timeline.__init__**
|
||||||
|
|
||||||
|
In `main.py`, find the Timeline class `__init__` method (around line 198). After `self._markers` initialization (line 209), add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
self._scan_regions: list[tuple[float, float, float]] = [] # (start, end, score)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Add set_scan_regions method**
|
||||||
|
|
||||||
|
After the `set_markers` method (line 249-252), add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def set_scan_regions(self, regions: list[tuple[float, float, float]]) -> None:
|
||||||
|
"""regions: list of (start_time, end_time, score)"""
|
||||||
|
self._scan_regions = regions
|
||||||
|
self.update()
|
||||||
|
|
||||||
|
def clear_scan_regions(self) -> None:
|
||||||
|
self._scan_regions = []
|
||||||
|
self.update()
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Draw scan regions in paintEvent**
|
||||||
|
|
||||||
|
In `paintEvent` (starts around line 282), find the marker drawing section (line 363, comment `# ── export markers`). BEFORE that section, add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# ── scan regions ──────────────────────────────────────────────
|
||||||
|
if self._scan_regions and self._duration > 0:
|
||||||
|
for (start, end, score) in self._scan_regions:
|
||||||
|
x1 = int(start / self._duration * w)
|
||||||
|
x2 = int(end / self._duration * w)
|
||||||
|
alpha = int(40 + score * 80) # 40–120 opacity
|
||||||
|
p.fillRect(x1, rh, x2 - x1, h - rh, QColor(100, 200, 255, alpha))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Verify manually**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python main.py`
|
||||||
|
Expected: app starts without errors. No scan regions visible yet (none set).
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add main.py
|
||||||
|
git commit -m "feat: timeline scan region rendering"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: ScanWorker QThread
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `main.py` (add ScanWorker class, after ExportWorker around line 165)
|
||||||
|
|
||||||
|
**Step 1: Add the ScanWorker class**
|
||||||
|
|
||||||
|
After the `ExportWorker` class (ends around line 165), add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
class ScanWorker(QThread):
|
||||||
|
"""Runs audio similarity scan off the main thread."""
|
||||||
|
finished = pyqtSignal(list) # emits list of (start, end, score)
|
||||||
|
error = pyqtSignal(str)
|
||||||
|
progress = pyqtSignal(str) # status message
|
||||||
|
|
||||||
|
def __init__(self, video_path: str, clip_paths: list[str],
|
||||||
|
mode: str = "average", threshold: float = 0.7):
|
||||||
|
super().__init__()
|
||||||
|
self._video_path = video_path
|
||||||
|
self._clip_paths = clip_paths
|
||||||
|
self._mode = mode
|
||||||
|
self._threshold = threshold
|
||||||
|
self._cancel = False
|
||||||
|
|
||||||
|
def cancel(self) -> None:
|
||||||
|
self._cancel = True
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
from core.audio_scan import build_profile, scan_video
|
||||||
|
try:
|
||||||
|
self.progress.emit(f"Building profile from {len(self._clip_paths)} clips...")
|
||||||
|
profile = build_profile(self._clip_paths)
|
||||||
|
if self._cancel:
|
||||||
|
return
|
||||||
|
if profile is None:
|
||||||
|
self.error.emit("No valid reference clips found")
|
||||||
|
return
|
||||||
|
self.progress.emit("Scanning audio...")
|
||||||
|
regions = scan_video(
|
||||||
|
self._video_path, profile,
|
||||||
|
mode=self._mode, threshold=self._threshold,
|
||||||
|
cancel_flag=self,
|
||||||
|
)
|
||||||
|
if not self._cancel:
|
||||||
|
self.finished.emit(regions)
|
||||||
|
except Exception as e:
|
||||||
|
if not self._cancel:
|
||||||
|
self.error.emit(str(e))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Verify import works**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -c "from main import ScanWorker; print('ok')"`
|
||||||
|
Expected: `ok`
|
||||||
|
|
||||||
|
**Step 3: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add main.py
|
||||||
|
git commit -m "feat: add ScanWorker QThread for background scanning"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 5: DB helper — get_all_export_paths
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `core/db.py`
|
||||||
|
- Modify: `tests/test_audio_scan.py`
|
||||||
|
|
||||||
|
**Step 1: Write the test**
|
||||||
|
|
||||||
|
Add to `tests/test_audio_scan.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def test_db_get_all_export_paths():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
from core.db import ProcessedDB
|
||||||
|
db = ProcessedDB(path)
|
||||||
|
db.add("a.mp4", 10.0, "/out/a_001.mp4", profile="test")
|
||||||
|
db.add("b.mp4", 20.0, "/out/b_001.mp4", profile="test")
|
||||||
|
db.add("c.mp4", 30.0, "/out/c_001.mp4", profile="other")
|
||||||
|
paths = db.get_all_export_paths("test")
|
||||||
|
assert set(paths) == {"/out/a_001.mp4", "/out/b_001.mp4"}
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
|
||||||
|
Expected: FAIL with `AttributeError: 'ProcessedDB' object has no attribute 'get_all_export_paths'`
|
||||||
|
|
||||||
|
**Step 3: Write the implementation**
|
||||||
|
|
||||||
|
Add to `core/db.py`, after the `get_markers` method. Note: no lock needed — follows
|
||||||
|
the codebase convention where read-only methods don't acquire the lock.
|
||||||
|
|
||||||
|
```python
|
||||||
|
def get_all_export_paths(self, profile: str = "default") -> list[str]:
|
||||||
|
"""Return all unique output_path values for a given profile."""
|
||||||
|
if not self._enabled:
|
||||||
|
return []
|
||||||
|
rows = self._con.execute(
|
||||||
|
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
|
||||||
|
(profile,),
|
||||||
|
).fetchall()
|
||||||
|
return [r[0] for r in rows]
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
|
||||||
|
Expected: PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add core/db.py tests/test_audio_scan.py
|
||||||
|
git commit -m "feat: add get_all_export_paths to ProcessedDB"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 6: UI controls for audio scanning
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `main.py` (MainWindow class — control creation ~1490-1575, layout ~1620-1640)
|
||||||
|
|
||||||
|
**Step 1: Add scan control widgets**
|
||||||
|
|
||||||
|
In the MainWindow `__init__`, find the control creation section. After `self._chk_track` (around line 1501), add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# ── audio scan controls ──────────────────────────────────────
|
||||||
|
self._btn_scan = QPushButton("Scan")
|
||||||
|
self._btn_scan.setToolTip("Scan current video for audio segments matching reference clips")
|
||||||
|
self._btn_scan.clicked.connect(self._start_scan)
|
||||||
|
|
||||||
|
self._sld_threshold = QDoubleSpinBox()
|
||||||
|
self._sld_threshold.setRange(0.0, 1.0)
|
||||||
|
self._sld_threshold.setSingleStep(0.05)
|
||||||
|
self._sld_threshold.setValue(0.7)
|
||||||
|
self._sld_threshold.setPrefix("Thr: ")
|
||||||
|
self._sld_threshold.setToolTip("Similarity threshold (0=match everything, 1=exact match)")
|
||||||
|
|
||||||
|
self._cmb_scan_mode = QComboBox()
|
||||||
|
self._cmb_scan_mode.addItems(["Average", "Nearest"])
|
||||||
|
self._cmb_scan_mode.setToolTip("Average: compare to mean profile\nNearest: compare to closest clip")
|
||||||
|
|
||||||
|
self._cmb_scan_ref = QComboBox()
|
||||||
|
self._cmb_scan_ref.addItems(["Current Profile", "Custom Folder"])
|
||||||
|
self._cmb_scan_ref.currentIndexChanged.connect(self._on_scan_ref_changed)
|
||||||
|
self._scan_folder: str = ""
|
||||||
|
|
||||||
|
self._scan_worker: ScanWorker | None = None
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Add controls to settings_row layout**
|
||||||
|
|
||||||
|
Find the `settings_row` assembly (around line 1620). Before `settings_row.addStretch()` (around line 1635), add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
settings_row.addWidget(self._btn_scan)
|
||||||
|
settings_row.addWidget(self._sld_threshold)
|
||||||
|
settings_row.addWidget(self._cmb_scan_mode)
|
||||||
|
settings_row.addWidget(self._cmb_scan_ref)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Add handler methods**
|
||||||
|
|
||||||
|
Add these methods to MainWindow (after `_jump_to_next_marker` around line 2410):
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _on_scan_ref_changed(self, index: int) -> None:
|
||||||
|
if index == 1: # Custom Folder
|
||||||
|
folder = QFileDialog.getExistingDirectory(self, "Select reference clip folder")
|
||||||
|
if folder:
|
||||||
|
self._scan_folder = folder
|
||||||
|
else:
|
||||||
|
self._cmb_scan_ref.setCurrentIndex(0)
|
||||||
|
|
||||||
|
def _cleanup_scan_worker(self) -> None:
|
||||||
|
"""Disconnect signals and schedule deletion of old scan worker."""
|
||||||
|
if self._scan_worker is not None:
|
||||||
|
try:
|
||||||
|
self._scan_worker.finished.disconnect()
|
||||||
|
self._scan_worker.error.disconnect()
|
||||||
|
self._scan_worker.progress.disconnect()
|
||||||
|
except TypeError:
|
||||||
|
pass # already disconnected
|
||||||
|
self._scan_worker.deleteLater()
|
||||||
|
self._scan_worker = None
|
||||||
|
|
||||||
|
def _start_scan(self) -> None:
|
||||||
|
if not self._file_path:
|
||||||
|
self._show_status("No video loaded")
|
||||||
|
return
|
||||||
|
if self._scan_worker and self._scan_worker.isRunning():
|
||||||
|
self._show_status("Scan already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Clean up previous worker
|
||||||
|
self._cleanup_scan_worker()
|
||||||
|
|
||||||
|
# Collect reference clip paths
|
||||||
|
if self._cmb_scan_ref.currentIndex() == 0:
|
||||||
|
# Current profile — all exports across all files in this profile
|
||||||
|
clip_paths = [p for p in self._db.get_all_export_paths(self._profile)
|
||||||
|
if os.path.exists(p)]
|
||||||
|
else:
|
||||||
|
# Custom folder
|
||||||
|
if not self._scan_folder:
|
||||||
|
self._show_status("No reference folder selected")
|
||||||
|
return
|
||||||
|
exts = (".mp4", ".mkv", ".avi", ".mov", ".wav", ".mp3", ".flac")
|
||||||
|
clip_paths = [
|
||||||
|
os.path.join(self._scan_folder, f)
|
||||||
|
for f in sorted(os.listdir(self._scan_folder))
|
||||||
|
if f.lower().endswith(exts)
|
||||||
|
]
|
||||||
|
|
||||||
|
if not clip_paths:
|
||||||
|
self._show_status("No reference clips found")
|
||||||
|
return
|
||||||
|
|
||||||
|
mode = self._cmb_scan_mode.currentText().lower()
|
||||||
|
threshold = self._sld_threshold.value()
|
||||||
|
|
||||||
|
self._btn_scan.setEnabled(False)
|
||||||
|
self._scan_file_path = self._file_path # remember which file we're scanning
|
||||||
|
self._show_status(f"Scanning with {len(clip_paths)} reference clips...")
|
||||||
|
|
||||||
|
self._scan_worker = ScanWorker(self._file_path, clip_paths, mode, threshold)
|
||||||
|
self._scan_worker.finished.connect(self._on_scan_done)
|
||||||
|
self._scan_worker.error.connect(self._on_scan_error)
|
||||||
|
self._scan_worker.progress.connect(self._show_status)
|
||||||
|
self._scan_worker.start()
|
||||||
|
|
||||||
|
def _on_scan_done(self, regions: list) -> None:
|
||||||
|
self._btn_scan.setEnabled(True)
|
||||||
|
# Ignore stale results if the user switched files during scan
|
||||||
|
if self._file_path != getattr(self, '_scan_file_path', None):
|
||||||
|
return
|
||||||
|
self._timeline.set_scan_regions(regions)
|
||||||
|
self._show_status(f"Scan complete: {len(regions)} matching regions")
|
||||||
|
|
||||||
|
def _on_scan_error(self, msg: str) -> None:
|
||||||
|
self._btn_scan.setEnabled(True)
|
||||||
|
self._show_status(f"Scan error: {msg}")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Verify manually**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python main.py`
|
||||||
|
Expected: Scan button, threshold spinner, mode dropdown, and reference source dropdown visible in the settings row. Clicking Scan with no file loaded shows "No video loaded" in status.
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add main.py
|
||||||
|
git commit -m "feat: add scan UI controls and start_scan handler"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 7: Keyboard shortcut — jump to next scan region
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `main.py`
|
||||||
|
|
||||||
|
**Step 1: Add the keyboard shortcut**
|
||||||
|
|
||||||
|
Find the shortcut definitions (around line 1728, where `QShortcut(QKeySequence("M"), ...)` is defined). Add after it:
|
||||||
|
|
||||||
|
```python
|
||||||
|
QShortcut(QKeySequence("S"), self, context=ctx).activated.connect(self._jump_to_next_scan_region)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Add the jump method**
|
||||||
|
|
||||||
|
After `_on_scan_error` (or after `_jump_to_next_marker`), add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _jump_to_next_scan_region(self) -> None:
|
||||||
|
regions = sorted(self._timeline._scan_regions, key=lambda r: r[0])
|
||||||
|
if not regions:
|
||||||
|
return
|
||||||
|
for (start, _end, _score) in regions:
|
||||||
|
if start > self._cursor + 0.1:
|
||||||
|
self._step_cursor(start - self._cursor)
|
||||||
|
return
|
||||||
|
# Wrap to first region
|
||||||
|
self._step_cursor(regions[0][0] - self._cursor)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Update help text**
|
||||||
|
|
||||||
|
Find the help/shortcuts tooltip (around line 1757). Add a row:
|
||||||
|
|
||||||
|
```python
|
||||||
|
"<tr><td><b>S</b></td><td>Jump to next scan region</td></tr>"
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Clear scan regions and cancel running scan on file change**
|
||||||
|
|
||||||
|
Find `_load_file` method (around line 1931). After the existing marker/state resets, add:
|
||||||
|
|
||||||
|
```python
|
||||||
|
self._timeline.clear_scan_regions()
|
||||||
|
if self._scan_worker and self._scan_worker.isRunning():
|
||||||
|
self._scan_worker.cancel()
|
||||||
|
self._cleanup_scan_worker()
|
||||||
|
self._btn_scan.setEnabled(True)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 5: Verify manually**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python main.py`
|
||||||
|
Expected: S key does nothing when no scan regions exist. After a scan, S jumps through matched regions.
|
||||||
|
|
||||||
|
**Step 6: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add main.py
|
||||||
|
git commit -m "feat: add S shortcut and clear scan on file change"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 8: Final integration test
|
||||||
|
|
||||||
|
**Step 1: End-to-end manual test**
|
||||||
|
|
||||||
|
1. Open the app: `cd /media/p5/8-cut && python main.py`
|
||||||
|
2. Load a video file
|
||||||
|
3. Export a few clips (these become the reference)
|
||||||
|
4. Set reference source to "Current Profile"
|
||||||
|
5. Click "Scan"
|
||||||
|
6. Verify: status shows progress messages, then "Scan complete: N matching regions"
|
||||||
|
7. Verify: cyan-tinted regions appear on the timeline
|
||||||
|
8. Press S to jump through scan regions
|
||||||
|
9. Change threshold and re-scan — verify different number of regions
|
||||||
|
10. Switch mode to "Nearest" and re-scan
|
||||||
|
11. Switch reference to "Custom Folder", pick a folder with clips
|
||||||
|
12. Re-scan and verify results
|
||||||
|
|
||||||
|
**Step 2: Run all tests**
|
||||||
|
|
||||||
|
Run: `cd /media/p5/8-cut && python -m pytest tests/ -v`
|
||||||
|
Expected: all tests PASS
|
||||||
|
|
||||||
|
**Step 3: Final commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add -A
|
||||||
|
git commit -m "feat: audio similarity scanning complete"
|
||||||
|
```
|
||||||
@@ -208,6 +208,7 @@ class TimelineWidget(QWidget):
|
|||||||
self._crop_keyframes: list[tuple[float, float, str | None, bool, bool]] = []
|
self._crop_keyframes: list[tuple[float, float, str | None, bool, bool]] = []
|
||||||
self._markers: list[tuple[float, int, str]] = []
|
self._markers: list[tuple[float, int, str]] = []
|
||||||
self._hover_cache: list[tuple[float, str]] = [] # (t/duration, path)
|
self._hover_cache: list[tuple[float, str]] = [] # (t/duration, path)
|
||||||
|
self._scan_regions: list[tuple[float, float, float]] = [] # (start, end, score)
|
||||||
|
|
||||||
# Cached paint resources — created once, reused every frame
|
# Cached paint resources — created once, reused every frame
|
||||||
self._cursor_pen = QPen(QColor(255, 210, 0))
|
self._cursor_pen = QPen(QColor(255, 210, 0))
|
||||||
@@ -252,6 +253,15 @@ class TimelineWidget(QWidget):
|
|||||||
self._rebuild_hover_cache()
|
self._rebuild_hover_cache()
|
||||||
self.update()
|
self.update()
|
||||||
|
|
||||||
|
def set_scan_regions(self, regions: list[tuple[float, float, float]]) -> None:
|
||||||
|
"""regions: list of (start_time, end_time, score)"""
|
||||||
|
self._scan_regions = regions
|
||||||
|
self.update()
|
||||||
|
|
||||||
|
def clear_scan_regions(self) -> None:
|
||||||
|
self._scan_regions = []
|
||||||
|
self.update()
|
||||||
|
|
||||||
def set_play_position(self, t: float | None) -> None:
|
def set_play_position(self, t: float | None) -> None:
|
||||||
# In lock mode, ignore mpv position updates while the user is dragging
|
# In lock mode, ignore mpv position updates while the user is dragging
|
||||||
# — the async seek hasn't caught up yet, so mpv reports stale values.
|
# — the async seek hasn't caught up yet, so mpv reports stale values.
|
||||||
@@ -360,6 +370,14 @@ class TimelineWidget(QWidget):
|
|||||||
p.drawLine(x_start, rh, x_start, h)
|
p.drawLine(x_start, rh, x_start, h)
|
||||||
p.drawLine(x_end, rh, x_end, h)
|
p.drawLine(x_end, rh, x_end, h)
|
||||||
|
|
||||||
|
# ── scan regions ──────────────────────────────────────────────
|
||||||
|
if self._scan_regions and self._duration > 0:
|
||||||
|
for (start, end, score) in self._scan_regions:
|
||||||
|
x1 = int(start / self._duration * w)
|
||||||
|
x2 = int(end / self._duration * w)
|
||||||
|
alpha = int(40 + score * 80) # 40–120 opacity
|
||||||
|
p.fillRect(x1, rh, x2 - x1, h - rh, QColor(100, 200, 255, alpha))
|
||||||
|
|
||||||
# ── export markers ────────────────────────────────────────────
|
# ── export markers ────────────────────────────────────────────
|
||||||
p.setFont(self._marker_font)
|
p.setFont(self._marker_font)
|
||||||
for (t, num, _path) in self._markers:
|
for (t, num, _path) in self._markers:
|
||||||
|
|||||||
@@ -0,0 +1,120 @@
|
|||||||
|
import tempfile, os
|
||||||
|
import numpy as np
|
||||||
|
from core.audio_scan import build_profile, _extract_mfcc, scan_video
|
||||||
|
|
||||||
|
|
||||||
|
def _make_wav(path: str, duration: float = 8.0, sr: int = 22050):
|
||||||
|
"""Create a short sine-wave WAV file for testing."""
|
||||||
|
import soundfile as sf
|
||||||
|
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
|
||||||
|
audio = 0.5 * np.sin(2 * np.pi * 440 * t)
|
||||||
|
sf.write(path, audio, sr)
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_mfcc_returns_1d_vector():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
||||||
|
_make_wav(f.name)
|
||||||
|
try:
|
||||||
|
vec = _extract_mfcc(f.name)
|
||||||
|
assert vec.shape == (20,)
|
||||||
|
assert not np.isnan(vec).any()
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_single_clip():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
||||||
|
_make_wav(f.name)
|
||||||
|
try:
|
||||||
|
profile = build_profile([f.name])
|
||||||
|
assert "mean_vector" in profile
|
||||||
|
assert "clip_vectors" in profile
|
||||||
|
assert profile["mean_vector"].shape == (20,)
|
||||||
|
assert len(profile["clip_vectors"]) == 1
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_multiple_clips():
|
||||||
|
paths = []
|
||||||
|
try:
|
||||||
|
for i in range(3):
|
||||||
|
f = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
||||||
|
freq = 440 + i * 200
|
||||||
|
import soundfile as sf
|
||||||
|
t = np.linspace(0, 8.0, 22050 * 8, endpoint=False)
|
||||||
|
sf.write(f.name, 0.5 * np.sin(2 * np.pi * freq * t), 22050)
|
||||||
|
paths.append(f.name)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
profile = build_profile(paths)
|
||||||
|
assert len(profile["clip_vectors"]) == 3
|
||||||
|
assert profile["mean_vector"].shape == (20,)
|
||||||
|
finally:
|
||||||
|
for p in paths:
|
||||||
|
os.unlink(p)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_skips_missing_files():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
|
||||||
|
_make_wav(f.name)
|
||||||
|
try:
|
||||||
|
profile = build_profile([f.name, "/no/such/file.wav"])
|
||||||
|
assert len(profile["clip_vectors"]) == 1
|
||||||
|
finally:
|
||||||
|
os.unlink(f.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_profile_empty_returns_none():
|
||||||
|
result = build_profile([])
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_video_finds_matching_region():
|
||||||
|
"""A video made of the same sine wave as the reference should match."""
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
|
||||||
|
_make_wav(ref.name, duration=8.0)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
|
||||||
|
_make_wav(vid.name, duration=20.0)
|
||||||
|
try:
|
||||||
|
profile = build_profile([ref.name])
|
||||||
|
regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0)
|
||||||
|
assert len(regions) > 0
|
||||||
|
for start, end, score in regions:
|
||||||
|
assert abs((end - start) - 8.0) < 1e-9
|
||||||
|
assert score >= 0.5
|
||||||
|
finally:
|
||||||
|
os.unlink(ref.name)
|
||||||
|
os.unlink(vid.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_video_nearest_mode():
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
|
||||||
|
_make_wav(ref.name, duration=8.0)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
|
||||||
|
_make_wav(vid.name, duration=20.0)
|
||||||
|
try:
|
||||||
|
profile = build_profile([ref.name])
|
||||||
|
regions = scan_video(vid.name, profile, mode="nearest", threshold=0.5, hop=1.0)
|
||||||
|
assert len(regions) > 0
|
||||||
|
finally:
|
||||||
|
os.unlink(ref.name)
|
||||||
|
os.unlink(vid.name)
|
||||||
|
|
||||||
|
|
||||||
|
def test_scan_video_high_threshold_no_match():
|
||||||
|
"""Different frequencies with very high threshold should not match."""
|
||||||
|
import soundfile as sf
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
|
||||||
|
t = np.linspace(0, 8.0, 22050 * 8, endpoint=False)
|
||||||
|
sf.write(ref.name, 0.5 * np.sin(2 * np.pi * 440 * t), 22050)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
|
||||||
|
# White noise — very different from sine wave
|
||||||
|
sf.write(vid.name, np.random.randn(22050 * 20).astype(np.float32) * 0.1, 22050)
|
||||||
|
try:
|
||||||
|
profile = build_profile([ref.name])
|
||||||
|
regions = scan_video(vid.name, profile, mode="average", threshold=0.99, hop=1.0)
|
||||||
|
assert len(regions) == 0
|
||||||
|
finally:
|
||||||
|
os.unlink(ref.name)
|
||||||
|
os.unlink(vid.name)
|
||||||
Reference in New Issue
Block a user