Files
8-cut/docs/plans/2026-04-17-audio-scan-implementation.md
Ethanfel b1980de6d1 fix: 9 bugs in audio scan implementation plan
- Swap Task 5/6 order so get_all_export_paths exists before UI uses it
- Remove cosine similarity clamping to preserve anti-correlation signal
- Use os.path.exists instead of os.path.isfile (handles image sequences)
- Add worker cleanup to disconnect stale signals before new scan
- Remove lock from get_all_export_paths (matches read-only convention)
- Always use get_all_export_paths for Current Profile (not current-file-first)
- Filter export paths with os.path.exists for deleted files
- Use abs() for float comparison in tests instead of ==
- Add cancel_flag to ScanWorker and scan_video for interruptible scans

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-17 08:43:53 +02:00

740 lines
23 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Audio Similarity Scanning — Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Scan a video's audio track to find segments matching a reference sound profile, displayed as highlighted regions on the timeline.
**Architecture:** New `core/audio_scan.py` module extracts MFCC features from reference clips and slides an 8s window across the target video's audio, scoring each position via cosine similarity. A `ScanWorker` QThread runs the scan in the background, and results are drawn as semi-transparent rectangles on the existing Timeline widget.
**Tech Stack:** Python 3, librosa 0.11, numpy, PyQt6
---
### Task 1: Core audio_scan module — build_profile
**Files:**
- Create: `core/audio_scan.py`
- Create: `tests/test_audio_scan.py`
**Step 1: Write the tests**
```python
# tests/test_audio_scan.py
import tempfile, os
import numpy as np
from core.audio_scan import build_profile, _extract_mfcc
def _make_wav(path: str, duration: float = 8.0, sr: int = 22050):
"""Create a short sine-wave WAV file for testing."""
import soundfile as sf
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
audio = 0.5 * np.sin(2 * np.pi * 440 * t)
sf.write(path, audio, sr)
def test_extract_mfcc_returns_1d_vector():
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
_make_wav(f.name)
try:
vec = _extract_mfcc(f.name)
assert vec.shape == (20,)
assert not np.isnan(vec).any()
finally:
os.unlink(f.name)
def test_build_profile_single_clip():
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
_make_wav(f.name)
try:
profile = build_profile([f.name])
assert "mean_vector" in profile
assert "clip_vectors" in profile
assert profile["mean_vector"].shape == (20,)
assert len(profile["clip_vectors"]) == 1
finally:
os.unlink(f.name)
def test_build_profile_multiple_clips():
paths = []
try:
for i in range(3):
f = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
freq = 440 + i * 200
import soundfile as sf
t = np.linspace(0, 8.0, 22050 * 8, endpoint=False)
sf.write(f.name, 0.5 * np.sin(2 * np.pi * freq * t), 22050)
paths.append(f.name)
f.close()
profile = build_profile(paths)
assert len(profile["clip_vectors"]) == 3
assert profile["mean_vector"].shape == (20,)
finally:
for p in paths:
os.unlink(p)
def test_build_profile_skips_missing_files():
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
_make_wav(f.name)
try:
profile = build_profile([f.name, "/no/such/file.wav"])
assert len(profile["clip_vectors"]) == 1
finally:
os.unlink(f.name)
def test_build_profile_empty_returns_none():
result = build_profile([])
assert result is None
```
**Step 2: Run tests to verify they fail**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'core.audio_scan'`
**Step 3: Write the implementation**
```python
# core/audio_scan.py
"""Audio similarity scanning — MFCC-based profile matching."""
import numpy as np
import librosa
from .paths import _log
_N_MFCC = 20
_SR = 22050
def _extract_mfcc(path: str, sr: int = _SR) -> np.ndarray:
"""Load audio from a file and return a mean MFCC vector (20-dim)."""
y, _ = librosa.load(path, sr=sr, mono=True)
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=_N_MFCC)
return mfcc.mean(axis=1) # average over time → (20,)
def build_profile(clip_paths: list[str]) -> dict | None:
"""Extract MFCCs from reference clips.
Returns dict with:
- mean_vector: averaged MFCC across all clips (20,)
- clip_vectors: list of individual MFCC vectors
Returns None if no clips could be loaded.
"""
vectors = []
for p in clip_paths:
try:
vec = _extract_mfcc(p)
vectors.append(vec)
except Exception as e:
_log(f"audio_scan: skip {p}: {e}")
if not vectors:
return None
arr = np.stack(vectors)
return {
"mean_vector": arr.mean(axis=0),
"clip_vectors": vectors,
}
```
**Step 4: Run tests to verify they pass**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py -v`
Expected: all 5 PASS
**Step 5: Commit**
```bash
git add core/audio_scan.py tests/test_audio_scan.py
git commit -m "feat: add audio_scan module with build_profile"
```
---
### Task 2: Core audio_scan module — scan_video
**Files:**
- Modify: `core/audio_scan.py`
- Modify: `tests/test_audio_scan.py`
**Step 1: Write the tests**
Add to `tests/test_audio_scan.py`:
```python
from core.audio_scan import scan_video
def test_scan_video_finds_matching_region():
"""A video made of the same sine wave as the reference should match."""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
_make_wav(ref.name, duration=8.0)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
_make_wav(vid.name, duration=20.0)
try:
profile = build_profile([ref.name])
regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0)
assert len(regions) > 0
for start, end, score in regions:
assert abs((end - start) - 8.0) < 1e-9
assert score >= 0.5
assert score >= 0.5
finally:
os.unlink(ref.name)
os.unlink(vid.name)
def test_scan_video_nearest_mode():
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
_make_wav(ref.name, duration=8.0)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
_make_wav(vid.name, duration=20.0)
try:
profile = build_profile([ref.name])
regions = scan_video(vid.name, profile, mode="nearest", threshold=0.5, hop=1.0)
assert len(regions) > 0
finally:
os.unlink(ref.name)
os.unlink(vid.name)
def test_scan_video_high_threshold_no_match():
"""Different frequencies with very high threshold should not match."""
import soundfile as sf
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref:
t = np.linspace(0, 8.0, 22050 * 8, endpoint=False)
sf.write(ref.name, 0.5 * np.sin(2 * np.pi * 440 * t), 22050)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid:
# White noise — very different from sine wave
sf.write(vid.name, np.random.randn(22050 * 20).astype(np.float32) * 0.1, 22050)
try:
profile = build_profile([ref.name])
regions = scan_video(vid.name, profile, mode="average", threshold=0.99, hop=1.0)
assert len(regions) == 0
finally:
os.unlink(ref.name)
os.unlink(vid.name)
```
**Step 2: Run tests to verify they fail**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_scan_video_finds_matching_region -v`
Expected: FAIL with `ImportError: cannot import name 'scan_video'`
**Step 3: Write the implementation**
Add to `core/audio_scan.py`:
```python
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between two vectors.
Returns value in [-1, 1]. Negative means anti-correlated (very
dissimilar). For threshold filtering this is fine — negative scores
never exceed the threshold. Scores near 0 may be uncorrelated or
weakly anti-correlated.
"""
na = np.linalg.norm(a)
nb = np.linalg.norm(b)
if na == 0 or nb == 0:
return 0.0
return float(np.dot(a, b) / (na * nb))
def scan_video(
video_path: str,
profile: dict,
mode: str = "average",
threshold: float = 0.7,
hop: float = 1.0,
window: float = 8.0,
cancel_flag: object = None,
) -> list[tuple[float, float, float]]:
"""Slide a window across the video audio and score against the profile.
Args:
video_path: path to video/audio file
profile: dict from build_profile()
mode: "average" (compare to mean) or "nearest" (max over all clips)
threshold: minimum cosine similarity to include
hop: step size in seconds
window: window size in seconds (default 8s)
cancel_flag: object with _cancel bool attribute; checked each iteration
Returns:
list of (start_time, end_time, score) for regions above threshold
"""
_log(f"audio_scan: loading {video_path}")
y, sr = librosa.load(video_path, sr=_SR, mono=True)
duration = len(y) / sr
_log(f"audio_scan: {duration:.1f}s loaded, scanning with hop={hop}s")
win_samples = int(window * sr)
hop_samples = int(hop * sr)
results = []
pos = 0
while pos + win_samples <= len(y):
if cancel_flag and getattr(cancel_flag, '_cancel', False):
_log("audio_scan: cancelled")
return results
chunk = y[pos : pos + win_samples]
mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC)
vec = mfcc.mean(axis=1)
if mode == "nearest":
score = max(
_cosine_similarity(vec, cv) for cv in profile["clip_vectors"]
)
else: # average
score = _cosine_similarity(vec, profile["mean_vector"])
if score >= threshold:
start_t = pos / sr
results.append((start_t, start_t + window, score))
pos += hop_samples
_log(f"audio_scan: {len(results)} regions above threshold {threshold}")
return results
```
**Step 4: Run tests to verify they pass**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py -v`
Expected: all 8 PASS
**Step 5: Commit**
```bash
git add core/audio_scan.py tests/test_audio_scan.py
git commit -m "feat: add scan_video with average and nearest modes"
```
---
### Task 3: Timeline — draw scan regions
**Files:**
- Modify: `main.py` (Timeline class, around lines 209-260 and 300-375)
**Step 1: Add scan region storage to Timeline.__init__**
In `main.py`, find the Timeline class `__init__` method (around line 198). After `self._markers` initialization (line 209), add:
```python
self._scan_regions: list[tuple[float, float, float]] = [] # (start, end, score)
```
**Step 2: Add set_scan_regions method**
After the `set_markers` method (line 249-252), add:
```python
def set_scan_regions(self, regions: list[tuple[float, float, float]]) -> None:
"""regions: list of (start_time, end_time, score)"""
self._scan_regions = regions
self.update()
def clear_scan_regions(self) -> None:
self._scan_regions = []
self.update()
```
**Step 3: Draw scan regions in paintEvent**
In `paintEvent` (starts around line 282), find the marker drawing section (line 363, comment `# ── export markers`). BEFORE that section, add:
```python
# ── scan regions ──────────────────────────────────────────────
if self._scan_regions and self._duration > 0:
for (start, end, score) in self._scan_regions:
x1 = int(start / self._duration * w)
x2 = int(end / self._duration * w)
alpha = int(40 + score * 80) # 40120 opacity
p.fillRect(x1, rh, x2 - x1, h - rh, QColor(100, 200, 255, alpha))
```
**Step 4: Verify manually**
Run: `cd /media/p5/8-cut && python main.py`
Expected: app starts without errors. No scan regions visible yet (none set).
**Step 5: Commit**
```bash
git add main.py
git commit -m "feat: timeline scan region rendering"
```
---
### Task 4: ScanWorker QThread
**Files:**
- Modify: `main.py` (add ScanWorker class, after ExportWorker around line 165)
**Step 1: Add the ScanWorker class**
After the `ExportWorker` class (ends around line 165), add:
```python
class ScanWorker(QThread):
"""Runs audio similarity scan off the main thread."""
finished = pyqtSignal(list) # emits list of (start, end, score)
error = pyqtSignal(str)
progress = pyqtSignal(str) # status message
def __init__(self, video_path: str, clip_paths: list[str],
mode: str = "average", threshold: float = 0.7):
super().__init__()
self._video_path = video_path
self._clip_paths = clip_paths
self._mode = mode
self._threshold = threshold
self._cancel = False
def cancel(self) -> None:
self._cancel = True
def run(self):
from core.audio_scan import build_profile, scan_video
try:
self.progress.emit(f"Building profile from {len(self._clip_paths)} clips...")
profile = build_profile(self._clip_paths)
if self._cancel:
return
if profile is None:
self.error.emit("No valid reference clips found")
return
self.progress.emit("Scanning audio...")
regions = scan_video(
self._video_path, profile,
mode=self._mode, threshold=self._threshold,
cancel_flag=self,
)
if not self._cancel:
self.finished.emit(regions)
except Exception as e:
if not self._cancel:
self.error.emit(str(e))
```
**Step 2: Verify import works**
Run: `cd /media/p5/8-cut && python -c "from main import ScanWorker; print('ok')"`
Expected: `ok`
**Step 3: Commit**
```bash
git add main.py
git commit -m "feat: add ScanWorker QThread for background scanning"
```
---
### Task 5: DB helper — get_all_export_paths
**Files:**
- Modify: `core/db.py`
- Modify: `tests/test_audio_scan.py`
**Step 1: Write the test**
Add to `tests/test_audio_scan.py`:
```python
def test_db_get_all_export_paths():
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
from core.db import ProcessedDB
db = ProcessedDB(path)
db.add("a.mp4", 10.0, "/out/a_001.mp4", profile="test")
db.add("b.mp4", 20.0, "/out/b_001.mp4", profile="test")
db.add("c.mp4", 30.0, "/out/c_001.mp4", profile="other")
paths = db.get_all_export_paths("test")
assert set(paths) == {"/out/a_001.mp4", "/out/b_001.mp4"}
finally:
os.unlink(path)
```
**Step 2: Run test to verify it fails**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
Expected: FAIL with `AttributeError: 'ProcessedDB' object has no attribute 'get_all_export_paths'`
**Step 3: Write the implementation**
Add to `core/db.py`, after the `get_markers` method. Note: no lock needed — follows
the codebase convention where read-only methods don't acquire the lock.
```python
def get_all_export_paths(self, profile: str = "default") -> list[str]:
"""Return all unique output_path values for a given profile."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
return [r[0] for r in rows]
```
**Step 4: Run test to verify it passes**
Run: `cd /media/p5/8-cut && python -m pytest tests/test_audio_scan.py::test_db_get_all_export_paths -v`
Expected: PASS
**Step 5: Commit**
```bash
git add core/db.py tests/test_audio_scan.py
git commit -m "feat: add get_all_export_paths to ProcessedDB"
```
---
### Task 6: UI controls for audio scanning
**Files:**
- Modify: `main.py` (MainWindow class — control creation ~1490-1575, layout ~1620-1640)
**Step 1: Add scan control widgets**
In the MainWindow `__init__`, find the control creation section. After `self._chk_track` (around line 1501), add:
```python
# ── audio scan controls ──────────────────────────────────────
self._btn_scan = QPushButton("Scan")
self._btn_scan.setToolTip("Scan current video for audio segments matching reference clips")
self._btn_scan.clicked.connect(self._start_scan)
self._sld_threshold = QDoubleSpinBox()
self._sld_threshold.setRange(0.0, 1.0)
self._sld_threshold.setSingleStep(0.05)
self._sld_threshold.setValue(0.7)
self._sld_threshold.setPrefix("Thr: ")
self._sld_threshold.setToolTip("Similarity threshold (0=match everything, 1=exact match)")
self._cmb_scan_mode = QComboBox()
self._cmb_scan_mode.addItems(["Average", "Nearest"])
self._cmb_scan_mode.setToolTip("Average: compare to mean profile\nNearest: compare to closest clip")
self._cmb_scan_ref = QComboBox()
self._cmb_scan_ref.addItems(["Current Profile", "Custom Folder"])
self._cmb_scan_ref.currentIndexChanged.connect(self._on_scan_ref_changed)
self._scan_folder: str = ""
self._scan_worker: ScanWorker | None = None
```
**Step 2: Add controls to settings_row layout**
Find the `settings_row` assembly (around line 1620). Before `settings_row.addStretch()` (around line 1635), add:
```python
settings_row.addWidget(self._btn_scan)
settings_row.addWidget(self._sld_threshold)
settings_row.addWidget(self._cmb_scan_mode)
settings_row.addWidget(self._cmb_scan_ref)
```
**Step 3: Add handler methods**
Add these methods to MainWindow (after `_jump_to_next_marker` around line 2410):
```python
def _on_scan_ref_changed(self, index: int) -> None:
if index == 1: # Custom Folder
folder = QFileDialog.getExistingDirectory(self, "Select reference clip folder")
if folder:
self._scan_folder = folder
else:
self._cmb_scan_ref.setCurrentIndex(0)
def _cleanup_scan_worker(self) -> None:
"""Disconnect signals and schedule deletion of old scan worker."""
if self._scan_worker is not None:
try:
self._scan_worker.finished.disconnect()
self._scan_worker.error.disconnect()
self._scan_worker.progress.disconnect()
except TypeError:
pass # already disconnected
self._scan_worker.deleteLater()
self._scan_worker = None
def _start_scan(self) -> None:
if not self._file_path:
self._show_status("No video loaded")
return
if self._scan_worker and self._scan_worker.isRunning():
self._show_status("Scan already running")
return
# Clean up previous worker
self._cleanup_scan_worker()
# Collect reference clip paths
if self._cmb_scan_ref.currentIndex() == 0:
# Current profile — all exports across all files in this profile
clip_paths = [p for p in self._db.get_all_export_paths(self._profile)
if os.path.exists(p)]
else:
# Custom folder
if not self._scan_folder:
self._show_status("No reference folder selected")
return
exts = (".mp4", ".mkv", ".avi", ".mov", ".wav", ".mp3", ".flac")
clip_paths = [
os.path.join(self._scan_folder, f)
for f in sorted(os.listdir(self._scan_folder))
if f.lower().endswith(exts)
]
if not clip_paths:
self._show_status("No reference clips found")
return
mode = self._cmb_scan_mode.currentText().lower()
threshold = self._sld_threshold.value()
self._btn_scan.setEnabled(False)
self._scan_file_path = self._file_path # remember which file we're scanning
self._show_status(f"Scanning with {len(clip_paths)} reference clips...")
self._scan_worker = ScanWorker(self._file_path, clip_paths, mode, threshold)
self._scan_worker.finished.connect(self._on_scan_done)
self._scan_worker.error.connect(self._on_scan_error)
self._scan_worker.progress.connect(self._show_status)
self._scan_worker.start()
def _on_scan_done(self, regions: list) -> None:
self._btn_scan.setEnabled(True)
# Ignore stale results if the user switched files during scan
if self._file_path != getattr(self, '_scan_file_path', None):
return
self._timeline.set_scan_regions(regions)
self._show_status(f"Scan complete: {len(regions)} matching regions")
def _on_scan_error(self, msg: str) -> None:
self._btn_scan.setEnabled(True)
self._show_status(f"Scan error: {msg}")
```
**Step 4: Verify manually**
Run: `cd /media/p5/8-cut && python main.py`
Expected: Scan button, threshold spinner, mode dropdown, and reference source dropdown visible in the settings row. Clicking Scan with no file loaded shows "No video loaded" in status.
**Step 5: Commit**
```bash
git add main.py
git commit -m "feat: add scan UI controls and start_scan handler"
```
---
### Task 7: Keyboard shortcut — jump to next scan region
**Files:**
- Modify: `main.py`
**Step 1: Add the keyboard shortcut**
Find the shortcut definitions (around line 1728, where `QShortcut(QKeySequence("M"), ...)` is defined). Add after it:
```python
QShortcut(QKeySequence("S"), self, context=ctx).activated.connect(self._jump_to_next_scan_region)
```
**Step 2: Add the jump method**
After `_on_scan_error` (or after `_jump_to_next_marker`), add:
```python
def _jump_to_next_scan_region(self) -> None:
regions = sorted(self._timeline._scan_regions, key=lambda r: r[0])
if not regions:
return
for (start, _end, _score) in regions:
if start > self._cursor + 0.1:
self._step_cursor(start - self._cursor)
return
# Wrap to first region
self._step_cursor(regions[0][0] - self._cursor)
```
**Step 3: Update help text**
Find the help/shortcuts tooltip (around line 1757). Add a row:
```python
"<tr><td><b>S</b></td><td>Jump to next scan region</td></tr>"
```
**Step 4: Clear scan regions and cancel running scan on file change**
Find `_load_file` method (around line 1931). After the existing marker/state resets, add:
```python
self._timeline.clear_scan_regions()
if self._scan_worker and self._scan_worker.isRunning():
self._scan_worker.cancel()
self._cleanup_scan_worker()
self._btn_scan.setEnabled(True)
```
**Step 5: Verify manually**
Run: `cd /media/p5/8-cut && python main.py`
Expected: S key does nothing when no scan regions exist. After a scan, S jumps through matched regions.
**Step 6: Commit**
```bash
git add main.py
git commit -m "feat: add S shortcut and clear scan on file change"
```
---
### Task 8: Final integration test
**Step 1: End-to-end manual test**
1. Open the app: `cd /media/p5/8-cut && python main.py`
2. Load a video file
3. Export a few clips (these become the reference)
4. Set reference source to "Current Profile"
5. Click "Scan"
6. Verify: status shows progress messages, then "Scan complete: N matching regions"
7. Verify: cyan-tinted regions appear on the timeline
8. Press S to jump through scan regions
9. Change threshold and re-scan — verify different number of regions
10. Switch mode to "Nearest" and re-scan
11. Switch reference to "Custom Folder", pick a folder with clips
12. Re-scan and verify results
**Step 2: Run all tests**
Run: `cd /media/p5/8-cut && python -m pytest tests/ -v`
Expected: all tests PASS
**Step 3: Final commit**
```bash
git add -A
git commit -m "feat: audio similarity scanning complete"
```