diff --git a/core/audio_scan.py b/core/audio_scan.py index bc615e8..505b016 100644 --- a/core/audio_scan.py +++ b/core/audio_scan.py @@ -1,37 +1,59 @@ -"""Audio similarity scanning — MFCC-based profile matching.""" +"""Audio similarity scanning — MFCC + spectral contrast profile matching.""" import numpy as np import librosa from .paths import _log -_N_MFCC = 20 -_SR = 22050 +_N_MFCC = 13 # coefficients 0-12; we drop C0 → 12 usable +_SR = 16000 # lower sr = faster, no quality loss for style matching +_HOP_LENGTH = 1024 # STFT hop (~64ms frames at 16kHz) +_N_FFT = 2048 # STFT window +_WINDOW = 8.0 # seconds +_N_FEATURES = 62 # (12 mfcc + 12 delta + 7 sc) * 2 (mean + std) -def _extract_mfcc(path: str, sr: int = _SR) -> np.ndarray: - """Load audio from a file and return an MFCC feature vector (40-dim). +def _extract_features_from_signal(y: np.ndarray, sr: int = _SR) -> np.ndarray: + """Compute feature matrix (31 x T) from a raw audio signal. - Concatenates mean + std of each coefficient over time. - Mean captures average spectral content; std captures dynamics. + Features per frame: 12 MFCCs (skip C0) + 12 delta MFCCs + 7 spectral contrast. """ + S = np.abs(librosa.stft(y, n_fft=_N_FFT, hop_length=_HOP_LENGTH)) ** 2 + mel_S = librosa.feature.melspectrogram(S=S, sr=sr, hop_length=_HOP_LENGTH) + mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel_S), sr=sr, n_mfcc=_N_MFCC) + mfcc = mfcc[1:] # drop C0 (energy) — dominates cosine sim, kills discrimination + delta = librosa.feature.delta(mfcc) + sc = librosa.feature.spectral_contrast(S=S, sr=sr, hop_length=_HOP_LENGTH) + return np.vstack([mfcc, delta, sc]) # (31, T) + + +def _aggregate(feature_matrix: np.ndarray) -> np.ndarray: + """Collapse a (31, T) feature matrix into a (62,) vector via mean + std.""" + return np.concatenate([ + feature_matrix.mean(axis=1), + feature_matrix.std(axis=1), + ]) + + +def _extract_features(path: str, sr: int = _SR) -> np.ndarray: + """Load audio from a file and return a 62-dim feature vector.""" y, _ = librosa.load(path, sr=sr, mono=True) - mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=_N_MFCC) - return np.concatenate([mfcc.mean(axis=1), mfcc.std(axis=1)]) + feat = _extract_features_from_signal(y, sr) + return _aggregate(feat) def build_profile(clip_paths: list[str]) -> dict | None: - """Extract MFCCs from reference clips. + """Extract features from reference clips. Returns dict with: - - mean_vector: averaged MFCC across all clips (20,) - - clip_vectors: list of individual MFCC vectors + - mean_vector: averaged feature vector across all clips (62,) + - clip_vectors: list of individual feature vectors Returns None if no clips could be loaded. """ vectors = [] for p in clip_paths: try: - vec = _extract_mfcc(p) + vec = _extract_features(p) vectors.append(vec) except Exception as e: _log(f"audio_scan: skip {p}: {e}") @@ -44,40 +66,36 @@ def build_profile(clip_paths: list[str]) -> dict | None: } -def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: - """Cosine similarity between two vectors. +def _similarity(a: np.ndarray, b: np.ndarray) -> float: + """Euclidean-distance-based similarity in (0, 1]. - Returns value in [-1, 1]. Negative means anti-correlated (very - dissimilar). For threshold filtering this is fine — negative scores - never exceed the threshold. Scores near 0 may be uncorrelated or - weakly anti-correlated. + 1/(1+dist): identical → 1.0, very different → near 0. """ - na = np.linalg.norm(a) - nb = np.linalg.norm(b) - if na == 0 or nb == 0: - return 0.0 - return float(np.dot(a, b) / (na * nb)) + return float(1.0 / (1.0 + np.linalg.norm(a - b))) def scan_video( video_path: str, profile: dict, mode: str = "average", - threshold: float = 0.7, + threshold: float = 0.05, hop: float = 1.0, - window: float = 8.0, + window: float = _WINDOW, cancel_flag: object = None, ) -> list[tuple[float, float, float]]: """Slide a window across the video audio and score against the profile. + Pre-computes STFT once for the whole file, then uses vectorized + cumulative-sum sliding window for speed. + Args: video_path: path to video/audio file profile: dict from build_profile() mode: "average" (compare to mean) or "nearest" (max over all clips) - threshold: minimum cosine similarity to include + threshold: minimum similarity to include (0-1, default 0.05) hop: step size in seconds window: window size in seconds (default 8s) - cancel_flag: object with _cancel bool attribute; checked each iteration + cancel_flag: object with _cancel bool attribute; checked periodically Returns: list of (start_time, end_time, score) for regions above threshold @@ -85,34 +103,73 @@ def scan_video( _log(f"audio_scan: loading {video_path}") y, sr = librosa.load(video_path, sr=_SR, mono=True) duration = len(y) / sr - _log(f"audio_scan: {duration:.1f}s loaded, scanning with hop={hop}s") + _log(f"audio_scan: {duration:.1f}s loaded, extracting features...") - win_samples = int(window * sr) - hop_samples = int(hop * sr) + if cancel_flag and getattr(cancel_flag, '_cancel', False): + return [] - results = [] - pos = 0 - while pos + win_samples <= len(y): - if cancel_flag and getattr(cancel_flag, '_cancel', False): - _log("audio_scan: cancelled") - return results + # Compute features for the entire file at once (one STFT) + feat = _extract_features_from_signal(y, sr) # (31, T) + n_feats, T = feat.shape + fps = sr / _HOP_LENGTH # frames per second + win_frames = int(window * fps) + hop_frames = int(hop * fps) - chunk = y[pos : pos + win_samples] - mfcc = librosa.feature.mfcc(y=chunk, sr=sr, n_mfcc=_N_MFCC) - vec = np.concatenate([mfcc.mean(axis=1), mfcc.std(axis=1)]) + if win_frames > T: + _log("audio_scan: video shorter than window") + return [] - if mode == "nearest": - score = max( - _cosine_similarity(vec, cv) for cv in profile["clip_vectors"] - ) - else: # average - score = _cosine_similarity(vec, profile["mean_vector"]) + _log(f"audio_scan: scanning {T} frames, win={win_frames}, hop={hop_frames}") - if score >= threshold: - start_t = pos / sr - results.append((start_t, start_t + window, score)) + # Vectorized sliding window via cumulative sums + cumsum = np.zeros((n_feats, T + 1)) + cumsum[:, 1:] = np.cumsum(feat, axis=1) + cumsq = np.zeros((n_feats, T + 1)) + cumsq[:, 1:] = np.cumsum(feat ** 2, axis=1) - pos += hop_samples + starts = np.arange(0, T - win_frames + 1, hop_frames) + ends = starts + win_frames + + sums = cumsum[:, ends] - cumsum[:, starts] # (31, n_windows) + sq_sums = cumsq[:, ends] - cumsq[:, starts] + means = sums / win_frames + stds = np.sqrt(np.maximum(sq_sums / win_frames - means ** 2, 0) + 1e-10) + + window_vectors = np.vstack([means, stds]).T # (n_windows, 62) + + if cancel_flag and getattr(cancel_flag, '_cancel', False): + return [] + + # Score all windows + if mode == "nearest": + # Compare each window to every clip vector, take max + clip_vecs = np.stack(profile["clip_vectors"]) # (n_clips, 62) + results = [] + # Process in batches to check cancel_flag periodically + batch = 500 + for i in range(0, len(window_vectors), batch): + if cancel_flag and getattr(cancel_flag, '_cancel', False): + _log("audio_scan: cancelled") + return results + chunk = window_vectors[i:i + batch] + # cdist: (batch, n_clips) distances + dists = np.linalg.norm(chunk[:, None, :] - clip_vecs[None, :, :], axis=2) + scores = 1.0 / (1.0 + dists.min(axis=1)) # min dist = max similarity + for j, score in enumerate(scores): + if score >= threshold: + idx = i + j + start_t = starts[idx] / fps + results.append((start_t, start_t + window, float(score))) + else: + # Average mode: compare to mean vector + ref = profile["mean_vector"] + dists = np.linalg.norm(window_vectors - ref, axis=1) + scores = 1.0 / (1.0 + dists) + mask = scores >= threshold + results = [ + (starts[i] / fps, starts[i] / fps + window, float(scores[i])) + for i in np.nonzero(mask)[0] + ] _log(f"audio_scan: {len(results)} regions above threshold {threshold}") return results diff --git a/main.py b/main.py index bedda6d..1db8868 100755 --- a/main.py +++ b/main.py @@ -1568,7 +1568,7 @@ class MainWindow(QMainWindow): self._sld_threshold.setDecimals(2) self._sld_threshold.setRange(0.0, 1.0) self._sld_threshold.setSingleStep(0.01) - self._sld_threshold.setValue(0.70) + self._sld_threshold.setValue(0.05) self._sld_threshold.setPrefix("Thr: ") self._sld_threshold.setToolTip("Similarity threshold (0=match everything, 1=exact match)") diff --git a/tests/test_audio_scan.py b/tests/test_audio_scan.py index b7961aa..bdd1c6e 100644 --- a/tests/test_audio_scan.py +++ b/tests/test_audio_scan.py @@ -1,22 +1,22 @@ import tempfile, os import numpy as np -from core.audio_scan import build_profile, _extract_mfcc, scan_video +from core.audio_scan import build_profile, _extract_features, scan_video, _similarity -def _make_wav(path: str, duration: float = 8.0, sr: int = 22050): +def _make_wav(path: str, duration: float = 8.0, sr: int = 16000, freq: float = 440.0): """Create a short sine-wave WAV file for testing.""" import soundfile as sf t = np.linspace(0, duration, int(sr * duration), endpoint=False) - audio = 0.5 * np.sin(2 * np.pi * 440 * t) + audio = 0.5 * np.sin(2 * np.pi * freq * t) sf.write(path, audio, sr) -def test_extract_mfcc_returns_1d_vector(): +def test_extract_features_returns_62d_vector(): with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: _make_wav(f.name) try: - vec = _extract_mfcc(f.name) - assert vec.shape == (40,) + vec = _extract_features(f.name) + assert vec.shape == (62,) assert not np.isnan(vec).any() finally: os.unlink(f.name) @@ -29,7 +29,7 @@ def test_build_profile_single_clip(): profile = build_profile([f.name]) assert "mean_vector" in profile assert "clip_vectors" in profile - assert profile["mean_vector"].shape == (40,) + assert profile["mean_vector"].shape == (62,) assert len(profile["clip_vectors"]) == 1 finally: os.unlink(f.name) @@ -40,16 +40,13 @@ def test_build_profile_multiple_clips(): try: for i in range(3): f = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) - freq = 440 + i * 200 - import soundfile as sf - t = np.linspace(0, 8.0, 22050 * 8, endpoint=False) - sf.write(f.name, 0.5 * np.sin(2 * np.pi * freq * t), 22050) + _make_wav(f.name, freq=440 + i * 200) paths.append(f.name) f.close() profile = build_profile(paths) assert len(profile["clip_vectors"]) == 3 - assert profile["mean_vector"].shape == (40,) + assert profile["mean_vector"].shape == (62,) finally: for p in paths: os.unlink(p) @@ -70,6 +67,17 @@ def test_build_profile_empty_returns_none(): assert result is None +def test_similarity_identical_is_one(): + a = np.array([1.0, 2.0, 3.0]) + assert abs(_similarity(a, a) - 1.0) < 1e-9 + + +def test_similarity_distant_is_low(): + a = np.zeros(62) + b = np.ones(62) * 100 + assert _similarity(a, b) < 0.01 + + def test_scan_video_finds_matching_region(): """A video made of the same sine wave as the reference should match.""" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref: @@ -78,11 +86,11 @@ def test_scan_video_finds_matching_region(): _make_wav(vid.name, duration=20.0) try: profile = build_profile([ref.name]) - regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0) + regions = scan_video(vid.name, profile, mode="average", threshold=0.01, hop=1.0) assert len(regions) > 0 for start, end, score in regions: - assert abs((end - start) - 8.0) < 1e-9 - assert score >= 0.5 + assert abs((end - start) - 8.0) < 0.1 + assert score >= 0.01 finally: os.unlink(ref.name) os.unlink(vid.name) @@ -95,7 +103,7 @@ def test_scan_video_nearest_mode(): _make_wav(vid.name, duration=20.0) try: profile = build_profile([ref.name]) - regions = scan_video(vid.name, profile, mode="nearest", threshold=0.5, hop=1.0) + regions = scan_video(vid.name, profile, mode="nearest", threshold=0.01, hop=1.0) assert len(regions) > 0 finally: os.unlink(ref.name) @@ -106,20 +114,43 @@ def test_scan_video_high_threshold_no_match(): """Different frequencies with very high threshold should not match.""" import soundfile as sf with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref: - t = np.linspace(0, 8.0, 22050 * 8, endpoint=False) - sf.write(ref.name, 0.5 * np.sin(2 * np.pi * 440 * t), 22050) + _make_wav(ref.name, duration=8.0, freq=440) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as vid: # White noise — very different from sine wave - sf.write(vid.name, np.random.randn(22050 * 20).astype(np.float32) * 0.1, 22050) + sf.write(vid.name, np.random.randn(16000 * 20).astype(np.float32) * 0.1, 16000) try: profile = build_profile([ref.name]) - regions = scan_video(vid.name, profile, mode="average", threshold=0.99, hop=1.0) + regions = scan_video(vid.name, profile, mode="average", threshold=0.5, hop=1.0) assert len(regions) == 0 finally: os.unlink(ref.name) os.unlink(vid.name) +def test_scan_video_same_vs_different_discrimination(): + """Same-frequency match should score higher than cross-frequency.""" + import soundfile as sf + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as ref: + _make_wav(ref.name, duration=8.0, freq=440) + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as same: + _make_wav(same.name, duration=10.0, freq=440) + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as diff: + # White noise + sf.write(diff.name, np.random.randn(16000 * 10).astype(np.float32) * 0.1, 16000) + try: + profile = build_profile([ref.name]) + same_regions = scan_video(same.name, profile, mode="average", threshold=0.0, hop=1.0) + diff_regions = scan_video(diff.name, profile, mode="average", threshold=0.0, hop=1.0) + # Same-audio scores should be higher than noise scores + best_same = max(r[2] for r in same_regions) + best_diff = max(r[2] for r in diff_regions) + assert best_same > best_diff + finally: + os.unlink(ref.name) + os.unlink(same.name) + os.unlink(diff.name) + + def test_db_get_all_export_paths(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name