diff --git a/docs/plans/2026-04-17-audio-scan-implementation.md b/docs/plans/2026-04-17-audio-scan-implementation.md index 3c4bad5..8f7d433 100644 --- a/docs/plans/2026-04-17-audio-scan-implementation.md +++ b/docs/plans/2026-04-17-audio-scan-implementation.md @@ -182,8 +182,8 @@ def test_scan_video_finds_matching_region(): regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0) assert len(regions) > 0 for start, end, score in regions: - assert end - start == 8.0 - assert 0.0 <= score <= 1.0 + assert abs((end - start) - 8.0) < 1e-9 + assert score >= 0.5 assert score >= 0.5 finally: os.unlink(ref.name) @@ -233,12 +233,18 @@ Add to `core/audio_scan.py`: ```python def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: - """Cosine similarity between two vectors, clamped to [0, 1].""" + """Cosine similarity between two vectors. + + Returns value in [-1, 1]. Negative means anti-correlated (very + dissimilar). For threshold filtering this is fine — negative scores + never exceed the threshold. Scores near 0 may be uncorrelated or + weakly anti-correlated. + """ na = np.linalg.norm(a) nb = np.linalg.norm(b) if na == 0 or nb == 0: return 0.0 - return float(np.clip(np.dot(a, b) / (na * nb), 0.0, 1.0)) + return float(np.dot(a, b) / (na * nb)) def scan_video( @@ -248,6 +254,7 @@ def scan_video( threshold: float = 0.7, hop: float = 1.0, window: float = 8.0, + cancel_flag: object = None, ) -> list[tuple[float, float, float]]: """Slide a window across the video audio and score against the profile. @@ -258,6 +265,7 @@ def scan_video( threshold: minimum cosine similarity to include hop: step size in seconds window: window size in seconds (default 8s) + cancel_flag: object with _cancel bool attribute; checked each iteration Returns: list of (start_time, end_time, score) for regions above threshold @@ -273,6 +281,10 @@ def scan_video( results = [] pos = 0 while pos + win_samples <= len(y): + if cancel_flag and getattr(cancel_flag, '_cancel', False): + _log("audio_scan: cancelled") + return results + chunk = y[pos : pos + win_samples] mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC) vec = mfcc.mean(axis=1) @@ -387,12 +399,18 @@ class ScanWorker(QThread): self._clip_paths = clip_paths self._mode = mode self._threshold = threshold + self._cancel = False + + def cancel(self) -> None: + self._cancel = True def run(self): from core.audio_scan import build_profile, scan_video try: self.progress.emit(f"Building profile from {len(self._clip_paths)} clips...") profile = build_profile(self._clip_paths) + if self._cancel: + return if profile is None: self.error.emit("No valid reference clips found") return @@ -400,10 +418,13 @@ class ScanWorker(QThread): regions = scan_video( self._video_path, profile, mode=self._mode, threshold=self._threshold, + cancel_flag=self, ) - self.finished.emit(regions) + if not self._cancel: + self.finished.emit(regions) except Exception as e: - self.error.emit(str(e)) + if not self._cancel: + self.error.emit(str(e)) ``` **Step 2: Verify import works** @@ -420,7 +441,69 @@ git commit -m "feat: add ScanWorker QThread for background scanning" --- -### Task 5: UI controls for audio scanning +### Task 5: DB helper — get_all_export_paths + +**Files:** +- Modify: `core/db.py` +- Modify: `tests/test_audio_scan.py` + +**Step 1: Write the test** + +Add to `tests/test_audio_scan.py`: + +```python +def test_db_get_all_export_paths(): + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + path = f.name + try: + from core.db import ProcessedDB + db = ProcessedDB(path) + db.add("a.mp4", 10.0, "/out/a_001.mp4", profile="test") + db.add("b.mp4", 20.0, "/out/b_001.mp4", profile="test") + db.add("c.mp4", 30.0, "/out/c_001.mp4", profile="other") + paths = db.get_all_export_paths("test") + assert set(paths) == {"/out/a_001.mp4", "/out/b_001.mp4"} + finally: + os.unlink(path) +``` + +**Step 2: Run test to verify it fails** + +Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v` +Expected: FAIL with `AttributeError: 'ProcessedDB' object has no attribute 'get_all_export_paths'` + +**Step 3: Write the implementation** + +Add to `core/db.py`, after the `get_markers` method. Note: no lock needed — follows +the codebase convention where read-only methods don't acquire the lock. + +```python +def get_all_export_paths(self, profile: str = "default") -> list[str]: + """Return all unique output_path values for a given profile.""" + if not self._enabled: + return [] + rows = self._con.execute( + "SELECT DISTINCT output_path FROM processed WHERE profile = ?", + (profile,), + ).fetchall() + return [r[0] for r in rows] +``` + +**Step 4: Run test to verify it passes** + +Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v` +Expected: PASS + +**Step 5: Commit** + +```bash +git add core/db.py tests/test_audio_scan.py +git commit -m "feat: add get_all_export_paths to ProcessedDB" +``` + +--- + +### Task 6: UI controls for audio scanning **Files:** - Modify: `main.py` (MainWindow class — control creation ~1490-1575, layout ~1620-1640) @@ -478,6 +561,18 @@ def _on_scan_ref_changed(self, index: int) -> None: else: self._cmb_scan_ref.setCurrentIndex(0) +def _cleanup_scan_worker(self) -> None: + """Disconnect signals and schedule deletion of old scan worker.""" + if self._scan_worker is not None: + try: + self._scan_worker.finished.disconnect() + self._scan_worker.error.disconnect() + self._scan_worker.progress.disconnect() + except TypeError: + pass # already disconnected + self._scan_worker.deleteLater() + self._scan_worker = None + def _start_scan(self) -> None: if not self._file_path: self._show_status("No video loaded") @@ -486,14 +581,14 @@ def _start_scan(self) -> None: self._show_status("Scan already running") return + # Clean up previous worker + self._cleanup_scan_worker() + # Collect reference clip paths if self._cmb_scan_ref.currentIndex() == 0: - # Current profile — get export paths from DB - markers = self._db.get_markers(os.path.basename(self._file_path), self._profile) - clip_paths = [path for (_t, _n, path) in markers if os.path.isfile(path)] - if not clip_paths: - # Try all exports in the profile (not just current file) - clip_paths = self._db.get_all_export_paths(self._profile) + # Current profile — all exports across all files in this profile + clip_paths = [p for p in self._db.get_all_export_paths(self._profile) + if os.path.exists(p)] else: # Custom folder if not self._scan_folder: @@ -514,6 +609,7 @@ def _start_scan(self) -> None: threshold = self._sld_threshold.value() self._btn_scan.setEnabled(False) + self._scan_file_path = self._file_path # remember which file we're scanning self._show_status(f"Scanning with {len(clip_paths)} reference clips...") self._scan_worker = ScanWorker(self._file_path, clip_paths, mode, threshold) @@ -524,6 +620,9 @@ def _start_scan(self) -> None: def _on_scan_done(self, regions: list) -> None: self._btn_scan.setEnabled(True) + # Ignore stale results if the user switched files during scan + if self._file_path != getattr(self, '_scan_file_path', None): + return self._timeline.set_scan_regions(regions) self._show_status(f"Scan complete: {len(regions)} matching regions") @@ -546,68 +645,6 @@ git commit -m "feat: add scan UI controls and start_scan handler" --- -### Task 6: DB helper — get_all_export_paths - -**Files:** -- Modify: `core/db.py` -- Modify: `tests/test_audio_scan.py` - -**Step 1: Write the test** - -Add to `tests/test_audio_scan.py` (or `tests/test_utils.py`): - -```python -def test_db_get_all_export_paths(): - with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: - path = f.name - try: - from core.db import ProcessedDB - db = ProcessedDB(path) - db.add("a.mp4", 10.0, "/out/a_001.mp4", profile="test") - db.add("b.mp4", 20.0, "/out/b_001.mp4", profile="test") - db.add("c.mp4", 30.0, "/out/c_001.mp4", profile="other") - paths = db.get_all_export_paths("test") - assert set(paths) == {"/out/a_001.mp4", "/out/b_001.mp4"} - finally: - os.unlink(path) -``` - -**Step 2: Run test to verify it fails** - -Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v` -Expected: FAIL with `AttributeError: 'ProcessedDB' object has no attribute 'get_all_export_paths'` - -**Step 3: Write the implementation** - -Add to `core/db.py`, after the `get_markers` method: - -```python -def get_all_export_paths(self, profile: str = "default") -> list[str]: - """Return all unique output_path values for a given profile.""" - if not self._enabled: - return [] - with self._lock: - rows = self._con.execute( - "SELECT DISTINCT output_path FROM processed WHERE profile = ?", - (profile,), - ).fetchall() - return [r[0] for r in rows] -``` - -**Step 4: Run test to verify it passes** - -Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v` -Expected: PASS - -**Step 5: Commit** - -```bash -git add core/db.py tests/test_audio_scan.py -git commit -m "feat: add get_all_export_paths to ProcessedDB" -``` - ---- - ### Task 7: Keyboard shortcut — jump to next scan region **Files:** @@ -646,12 +683,16 @@ Find the help/shortcuts tooltip (around line 1757). Add a row: "