fix: 9 bugs in audio scan implementation plan

- Swap Task 5/6 order so get_all_export_paths exists before UI uses it
- Remove cosine similarity clamping to preserve anti-correlation signal
- Use os.path.exists instead of os.path.isfile (handles image sequences)
- Add worker cleanup to disconnect stale signals before new scan
- Remove lock from get_all_export_paths (matches read-only convention)
- Always use get_all_export_paths for Current Profile (not current-file-first)
- Filter export paths with os.path.exists for deleted files
- Use abs() for float comparison in tests instead of ==
- Add cancel_flag to ScanWorker and scan_video for interruptible scans

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-17 08:43:53 +02:00
parent 85e0641440
commit b1980de6d1
@@ -182,8 +182,8 @@ def test_scan_video_finds_matching_region():
regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0) regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0)
assert len(regions) > 0 assert len(regions) > 0
for start, end, score in regions: for start, end, score in regions:
assert end - start == 8.0 assert abs((end - start) - 8.0) < 1e-9
assert 0.0 <= score <= 1.0 assert score >= 0.5
assert score >= 0.5 assert score >= 0.5
finally: finally:
os.unlink(ref.name) os.unlink(ref.name)
@@ -233,12 +233,18 @@ Add to `core/audio_scan.py`:
```python ```python
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between two vectors, clamped to [0, 1].""" """Cosine similarity between two vectors.
Returns value in [-1, 1]. Negative means anti-correlated (very
dissimilar). For threshold filtering this is fine — negative scores
never exceed the threshold. Scores near 0 may be uncorrelated or
weakly anti-correlated.
"""
na = np.linalg.norm(a) na = np.linalg.norm(a)
nb = np.linalg.norm(b) nb = np.linalg.norm(b)
if na == 0 or nb == 0: if na == 0 or nb == 0:
return 0.0 return 0.0
return float(np.clip(np.dot(a, b) / (na * nb), 0.0, 1.0)) return float(np.dot(a, b) / (na * nb))
def scan_video( def scan_video(
@@ -248,6 +254,7 @@ def scan_video(
threshold: float = 0.7, threshold: float = 0.7,
hop: float = 1.0, hop: float = 1.0,
window: float = 8.0, window: float = 8.0,
cancel_flag: object = None,
) -> list[tuple[float, float, float]]: ) -> list[tuple[float, float, float]]:
"""Slide a window across the video audio and score against the profile. """Slide a window across the video audio and score against the profile.
@@ -258,6 +265,7 @@ def scan_video(
threshold: minimum cosine similarity to include threshold: minimum cosine similarity to include
hop: step size in seconds hop: step size in seconds
window: window size in seconds (default 8s) window: window size in seconds (default 8s)
cancel_flag: object with _cancel bool attribute; checked each iteration
Returns: Returns:
list of (start_time, end_time, score) for regions above threshold list of (start_time, end_time, score) for regions above threshold
@@ -273,6 +281,10 @@ def scan_video(
results = [] results = []
pos = 0 pos = 0
while pos + win_samples <= len(y): while pos + win_samples <= len(y):
if cancel_flag and getattr(cancel_flag, '_cancel', False):
_log("audio_scan: cancelled")
return results
chunk = y[pos : pos + win_samples] chunk = y[pos : pos + win_samples]
mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC) mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC)
vec = mfcc.mean(axis=1) vec = mfcc.mean(axis=1)
@@ -387,12 +399,18 @@ class ScanWorker(QThread):
self._clip_paths = clip_paths self._clip_paths = clip_paths
self._mode = mode self._mode = mode
self._threshold = threshold self._threshold = threshold
self._cancel = False
def cancel(self) -> None:
self._cancel = True
def run(self): def run(self):
from core.audio_scan import build_profile, scan_video from core.audio_scan import build_profile, scan_video
try: try:
self.progress.emit(f"Building profile from {len(self._clip_paths)} clips...") self.progress.emit(f"Building profile from {len(self._clip_paths)} clips...")
profile = build_profile(self._clip_paths) profile = build_profile(self._clip_paths)
if self._cancel:
return
if profile is None: if profile is None:
self.error.emit("No valid reference clips found") self.error.emit("No valid reference clips found")
return return
@@ -400,10 +418,13 @@ class ScanWorker(QThread):
regions = scan_video( regions = scan_video(
self._video_path, profile, self._video_path, profile,
mode=self._mode, threshold=self._threshold, mode=self._mode, threshold=self._threshold,
cancel_flag=self,
) )
self.finished.emit(regions) if not self._cancel:
self.finished.emit(regions)
except Exception as e: except Exception as e:
self.error.emit(str(e)) if not self._cancel:
self.error.emit(str(e))
``` ```
**Step 2: Verify import works** **Step 2: Verify import works**
@@ -420,7 +441,69 @@ git commit -m "feat: add ScanWorker QThread for background scanning"
--- ---
### Task 5: UI controls for audio scanning ### Task 5: DB helper — get_all_export_paths
**Files:**
- Modify: `core/db.py`
- Modify: `tests/test_audio_scan.py`
**Step 1: Write the test**
Add to `tests/test_audio_scan.py`:
```python
def test_db_get_all_export_paths():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
from core.db import ProcessedDB
db = ProcessedDB(path)
db.add("a.mp4", 10.0, "/out/a_001.mp4", profile="test")
db.add("b.mp4", 20.0, "/out/b_001.mp4", profile="test")
db.add("c.mp4", 30.0, "/out/c_001.mp4", profile="other")
paths = db.get_all_export_paths("test")
assert set(paths) == {"/out/a_001.mp4", "/out/b_001.mp4"}
finally:
os.unlink(path)
```
**Step 2: Run test to verify it fails**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
Expected: FAIL with `AttributeError: 'ProcessedDB' object has no attribute 'get_all_export_paths'`
**Step 3: Write the implementation**
Add to `core/db.py`, after the `get_markers` method. Note: no lock needed — follows
the codebase convention where read-only methods don't acquire the lock.
```python
def get_all_export_paths(self, profile: str = "default") -> list[str]:
"""Return all unique output_path values for a given profile."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
return [r[0] for r in rows]
```
**Step 4: Run test to verify it passes**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
Expected: PASS
**Step 5: Commit**
```bash
git add core/db.py tests/test_audio_scan.py
git commit -m "feat: add get_all_export_paths to ProcessedDB"
```
---
### Task 6: UI controls for audio scanning
**Files:** **Files:**
- Modify: `main.py` (MainWindow class — control creation ~1490-1575, layout ~1620-1640) - Modify: `main.py` (MainWindow class — control creation ~1490-1575, layout ~1620-1640)
@@ -478,6 +561,18 @@ def _on_scan_ref_changed(self, index: int) -> None:
else: else:
self._cmb_scan_ref.setCurrentIndex(0) self._cmb_scan_ref.setCurrentIndex(0)
def _cleanup_scan_worker(self) -> None:
"""Disconnect signals and schedule deletion of old scan worker."""
if self._scan_worker is not None:
try:
self._scan_worker.finished.disconnect()
self._scan_worker.error.disconnect()
self._scan_worker.progress.disconnect()
except TypeError:
pass # already disconnected
self._scan_worker.deleteLater()
self._scan_worker = None
def _start_scan(self) -> None: def _start_scan(self) -> None:
if not self._file_path: if not self._file_path:
self._show_status("No video loaded") self._show_status("No video loaded")
@@ -486,14 +581,14 @@ def _start_scan(self) -> None:
self._show_status("Scan already running") self._show_status("Scan already running")
return return
# Clean up previous worker
self._cleanup_scan_worker()
# Collect reference clip paths # Collect reference clip paths
if self._cmb_scan_ref.currentIndex() == 0: if self._cmb_scan_ref.currentIndex() == 0:
# Current profile — get export paths from DB # Current profile — all exports across all files in this profile
markers = self._db.get_markers(os.path.basename(self._file_path), self._profile) clip_paths = [p for p in self._db.get_all_export_paths(self._profile)
clip_paths = [path for (_t, _n, path) in markers if os.path.isfile(path)] if os.path.exists(p)]
if not clip_paths:
# Try all exports in the profile (not just current file)
clip_paths = self._db.get_all_export_paths(self._profile)
else: else:
# Custom folder # Custom folder
if not self._scan_folder: if not self._scan_folder:
@@ -514,6 +609,7 @@ def _start_scan(self) -> None:
threshold = self._sld_threshold.value() threshold = self._sld_threshold.value()
self._btn_scan.setEnabled(False) self._btn_scan.setEnabled(False)
self._scan_file_path = self._file_path # remember which file we're scanning
self._show_status(f"Scanning with {len(clip_paths)} reference clips...") self._show_status(f"Scanning with {len(clip_paths)} reference clips...")
self._scan_worker = ScanWorker(self._file_path, clip_paths, mode, threshold) self._scan_worker = ScanWorker(self._file_path, clip_paths, mode, threshold)
@@ -524,6 +620,9 @@ def _start_scan(self) -> None:
def _on_scan_done(self, regions: list) -> None: def _on_scan_done(self, regions: list) -> None:
self._btn_scan.setEnabled(True) self._btn_scan.setEnabled(True)
# Ignore stale results if the user switched files during scan
if self._file_path != getattr(self, '_scan_file_path', None):
return
self._timeline.set_scan_regions(regions) self._timeline.set_scan_regions(regions)
self._show_status(f"Scan complete: {len(regions)} matching regions") self._show_status(f"Scan complete: {len(regions)} matching regions")
@@ -546,68 +645,6 @@ git commit -m "feat: add scan UI controls and start_scan handler"
--- ---
### Task 6: DB helper — get_all_export_paths
**Files:**
- Modify: `core/db.py`
- Modify: `tests/test_audio_scan.py`
**Step 1: Write the test**
Add to `tests/test_audio_scan.py` (or `tests/test_utils.py`):
```python
def test_db_get_all_export_paths():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
from core.db import ProcessedDB
db = ProcessedDB(path)
db.add("a.mp4", 10.0, "/out/a_001.mp4", profile="test")
db.add("b.mp4", 20.0, "/out/b_001.mp4", profile="test")
db.add("c.mp4", 30.0, "/out/c_001.mp4", profile="other")
paths = db.get_all_export_paths("test")
assert set(paths) == {"/out/a_001.mp4", "/out/b_001.mp4"}
finally:
os.unlink(path)
```
**Step 2: Run test to verify it fails**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
Expected: FAIL with `AttributeError: 'ProcessedDB' object has no attribute 'get_all_export_paths'`
**Step 3: Write the implementation**
Add to `core/db.py`, after the `get_markers` method:
```python
def get_all_export_paths(self, profile: str = "default") -> list[str]:
"""Return all unique output_path values for a given profile."""
if not self._enabled:
return []
with self._lock:
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
return [r[0] for r in rows]
```
**Step 4: Run test to verify it passes**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
Expected: PASS
**Step 5: Commit**
```bash
git add core/db.py tests/test_audio_scan.py
git commit -m "feat: add get_all_export_paths to ProcessedDB"
```
---
### Task 7: Keyboard shortcut — jump to next scan region ### Task 7: Keyboard shortcut — jump to next scan region
**Files:** **Files:**
@@ -646,12 +683,16 @@ Find the help/shortcuts tooltip (around line 1757). Add a row:
"<tr><td><b>S</b></td><td>Jump to next scan region</td></tr>" "<tr><td><b>S</b></td><td>Jump to next scan region</td></tr>"
``` ```
**Step 4: Clear scan regions on file change** **Step 4: Clear scan regions and cancel running scan on file change**
Find `_load_file` method (around line 1931). After the existing marker/state resets, add: Find `_load_file` method (around line 1931). After the existing marker/state resets, add:
```python ```python
self._timeline.clear_scan_regions() self._timeline.clear_scan_regions()
if self._scan_worker and self._scan_worker.isRunning():
self._scan_worker.cancel()
self._cleanup_scan_worker()
self._btn_scan.setEnabled(True)
``` ```
**Step 5: Verify manually** **Step 5: Verify manually**