Files
8-cut/core/audio_scan.py
Ethanfel 299779cf29 feat: disable videos per-subcategory, named models, multi-category training, playlist separators
- Train dialog: multi-select positive subcategories via checkbox list, optional model name suffix ({profile}_{model}_{name}.joblib)
- list_trained_models recognizes named model variants
- Disable a video per-subcategory: moves its clips to a sibling {subcat}_disabled folder, rewrites DB output_path, migrates dataset.json, marks the name red
- Disabled clips excluded from training, stats, timeline, and playlist counts
- Playlist per-video count reflects only visible, non-disabled subcategories
- Persist subcategory show/hide visibility per profile across restarts
- Add/remove playlist separator rows (right-click) to mark batches, persisted per profile

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-05 12:45:03 +02:00

810 lines
32 KiB
Python

"""Audio scanning — embedding-based classifier for audio event detection."""
import hashlib
import os
import subprocess
import numpy as np
from .paths import _bin, _log
_SR = 16000 # lower sr = faster
def _load_audio_ffmpeg(path: str, sr: int = _SR) -> np.ndarray:
"""Load audio from any file as mono float32 numpy array using ffmpeg directly."""
cmd = [
_bin("ffmpeg"), "-i", path,
"-vn", # skip video
"-ac", "1", # mono
"-ar", str(sr), # resample
"-f", "f32le", # raw 32-bit float little-endian
"-loglevel", "error",
"pipe:1",
]
try:
proc = subprocess.run(cmd, capture_output=True, timeout=300)
except subprocess.TimeoutExpired:
raise RuntimeError(f"ffmpeg timed out (300s) on {os.path.basename(path)}")
if proc.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {proc.stderr.decode().strip()}")
return np.frombuffer(proc.stdout, dtype=np.float32)
_WINDOW = 8.0 # seconds
_PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_MODEL_DIR = os.path.join(_PROJECT_DIR, "models")
_W2V_CACHE_DIR = os.path.join(_PROJECT_DIR, "cache", "w2v")
_DL_CACHE_DIR = os.path.join(_PROJECT_DIR, "cache", "downloads")
# Redirect torch hub and huggingface downloads into the project
os.environ.setdefault("TORCH_HOME", _DL_CACHE_DIR)
os.environ.setdefault("HF_HOME", os.path.join(_DL_CACHE_DIR, "huggingface"))
# ---------------------------------------------------------------------------
# Embedding extraction (lazy-loaded)
# ---------------------------------------------------------------------------
_w2v_model = None
_w2v_device = None
_w2v_model_name = None
_ast_feature_extractor = None
# Supported embedding models — name → embed_dim
_EMBED_MODELS = {
"WAV2VEC2_BASE": 768,
"WAV2VEC2_LARGE": 1024,
"WAV2VEC2_LARGE_LV60K":1024,
"HUBERT_BASE": 768,
"HUBERT_LARGE": 1024,
"HUBERT_XLARGE": 1280,
"BEATS": 768,
# Multi-layer variants (4 quartile layers concatenated)
"WAV2VEC2_BASE_ML": 3072, # 768 * 4
"HUBERT_BASE_ML": 3072, # 768 * 4
"HUBERT_LARGE_ML": 4096, # 1024 * 4
"HUBERT_XLARGE_ML": 5120, # 1280 * 4
# Transformers-based models
"AST": 768,
"AST_ML": 3072, # 768 * 4
"EAT": 768,
"EAT_LARGE": 1024,
}
_DEFAULT_EMBED_MODEL = "EAT_LARGE"
_BEATS_CHECKPOINT = os.path.join(
_DL_CACHE_DIR, "huggingface", "hub",
"models--lpepino--beats_ckpts", "snapshots",
"5b53b0404df452a3a607d7e67687227730e5bad1", "BEATs_iter3_plus_AS2M.pt",
)
def _get_w2v_model(model_name: str | None = None):
"""Lazy-load an embedding model. Reloads if model_name differs from cached."""
global _w2v_model, _w2v_device, _w2v_model_name
if model_name is None:
model_name = _DEFAULT_EMBED_MODEL
# Multi-layer variants use the same base model weights
ml = _ml_config(model_name)
load_name = ml[0] if ml else model_name
if _w2v_model is None or _w2v_model_name != load_name:
import torch
_w2v_device = "cuda" if torch.cuda.is_available() else "cpu"
if load_name == "BEATS":
from .beats_model import BEATs, BEATsConfig
checkpoint = torch.load(_BEATS_CHECKPOINT, map_location=_w2v_device,
weights_only=False)
cfg = BEATsConfig(checkpoint['cfg'])
_w2v_model = BEATs(cfg)
_w2v_model.load_state_dict(checkpoint['model'])
_w2v_model.to(_w2v_device)
elif load_name == "AST":
from transformers import ASTModel, ASTFeatureExtractor
_w2v_model = ASTModel.from_pretrained(
"MIT/ast-finetuned-audioset-10-10-0.4593"
).to(_w2v_device)
global _ast_feature_extractor
_ast_feature_extractor = ASTFeatureExtractor.from_pretrained(
"MIT/ast-finetuned-audioset-10-10-0.4593"
)
elif load_name in ("EAT", "EAT_LARGE"):
from transformers import AutoModel
eat_repo = ("worstchan/EAT-large_epoch20_finetune_AS2M"
if load_name == "EAT_LARGE"
else "worstchan/EAT-base_epoch30_finetune_AS2M")
_w2v_model = AutoModel.from_pretrained(
eat_repo, trust_remote_code=True,
).to(_w2v_device)
else:
import torchaudio
bundle = getattr(torchaudio.pipelines, load_name)
_w2v_model = bundle.get_model().to(_w2v_device)
_w2v_model.eval()
_w2v_model_name = load_name
_log(f"audio_scan: {load_name} loaded on {_w2v_device}")
return _w2v_model, _w2v_device
def _eat_preprocess(chunks: list[np.ndarray], sr: int, device: str):
"""Convert raw audio chunks to EAT mel spectrogram input.
Returns tensor of shape [B, 1, T, 128].
8s audio at 10ms frame shift produces ~798 frames, zero-padded to 1024.
"""
import torch
import torchaudio.compliance.kaldi as kaldi
TARGET_LEN = 1024
MEAN, STD = -4.268, 4.569
mels = []
for chunk in chunks:
wav = torch.from_numpy(np.array(chunk)).unsqueeze(0).float()
fbank = kaldi.fbank(
wav, htk_compat=True, sample_frequency=sr, use_energy=False,
window_type='hanning', num_mel_bins=128, dither=0.0, frame_shift=10,
)
# Pad or truncate to TARGET_LEN
if fbank.shape[0] < TARGET_LEN:
fbank = torch.nn.functional.pad(fbank, (0, 0, 0, TARGET_LEN - fbank.shape[0]))
else:
fbank = fbank[:TARGET_LEN]
fbank = (fbank - MEAN) / (STD * 2)
mels.append(fbank)
return torch.stack(mels).unsqueeze(1).to(device) # [B, 1, T, 128]
def _embed_dim(model_name: str | None = None) -> int:
"""Return embedding dimension for a model name."""
if model_name is None:
model_name = _DEFAULT_EMBED_MODEL
return _EMBED_MODELS.get(model_name, 768)
def _ml_config(model_name: str) -> tuple[str, list[int]] | None:
"""If model_name is a multi-layer variant, return (base_model, layer_indices).
Returns None for single-layer models.
Layer indices are 0-based into the list returned by extract_features().
"""
if not model_name.endswith("_ML"):
return None
base = model_name[:-3] # strip "_ML"
if base not in _EMBED_MODELS:
return None
# Layer counts per model family
layer_counts = {
"WAV2VEC2_BASE": 12, "WAV2VEC2_LARGE": 24, "WAV2VEC2_LARGE_LV60K": 24,
"HUBERT_BASE": 12, "HUBERT_LARGE": 24, "HUBERT_XLARGE": 48,
"AST": 12,
}
n = layer_counts.get(base)
if n is None:
return None
# Select 4 layers at quartile boundaries (0-indexed)
indices = [n // 4 - 1, n // 2 - 1, 3 * n // 4 - 1, n - 1]
return base, indices
def _w2v_cache_path(video_path: str, hop: float, window: float,
model_name: str | None = None) -> str:
"""Return cache file path for a video's embeddings (includes model name)."""
if model_name is None:
model_name = _DEFAULT_EMBED_MODEL
abspath = os.path.abspath(video_path)
mtime = os.path.getmtime(abspath)
key = f"{abspath}|{mtime}|{hop}|{window}|{model_name}"
h = hashlib.sha256(key.encode()).hexdigest()[:16]
return os.path.join(_W2V_CACHE_DIR, f"{h}.npz")
def _w2v_cache_exists(video_path: str, hop: float, window: float,
model_name: str | None = None) -> bool:
"""Check if embedding cache exists for a video."""
try:
path = _w2v_cache_path(video_path, hop, window, model_name)
return os.path.exists(path)
except Exception:
return False
def _w2v_cache_load(video_path: str, hop: float, window: float,
model_name: str | None = None) -> tuple[np.ndarray, np.ndarray] | None:
"""Load embeddings from cache. Returns (timestamps, embeddings) or None."""
try:
path = _w2v_cache_path(video_path, hop, window, model_name)
if os.path.exists(path):
data = np.load(path)
_log(f"audio_scan: cache hit ({path})")
return data["timestamps"], data["embeddings"]
except Exception as e:
_log(f"audio_scan: cache read failed: {e}")
return None
def _extract_w2v_windows(y: np.ndarray, sr: int = _SR,
hop: float = 1.0, window: float = _WINDOW,
video_path: str | None = None,
cancel_flag: object = None,
model_name: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Extract embeddings for all sliding windows using a torchaudio model.
If video_path is given, results are cached to disk for fast re-scans.
Returns (timestamps, embeddings) where embeddings is (N, D).
"""
edim = _embed_dim(model_name)
# Try loading from cache
cache_file = None
if video_path:
try:
cache_file = _w2v_cache_path(video_path, hop, window, model_name)
if os.path.exists(cache_file):
data = np.load(cache_file)
_log(f"audio_scan: cache hit ({cache_file})")
return data["timestamps"], data["embeddings"]
except Exception as e:
_log(f"audio_scan: cache read failed: {e}")
win_samples = int(window * sr)
hop_samples = int(hop * sr)
n_windows = max(0, (len(y) - win_samples) // hop_samples + 1)
if n_windows == 0:
return np.array([]), np.empty((0, edim))
import torch
model, device = _get_w2v_model(model_name)
is_beats = (model_name or _DEFAULT_EMBED_MODEL) == "BEATS"
is_ast = (model_name or _DEFAULT_EMBED_MODEL) in ("AST", "AST_ML")
is_eat = (model_name or _DEFAULT_EMBED_MODEL) in ("EAT", "EAT_LARGE")
ml_cfg = _ml_config(model_name or _DEFAULT_EMBED_MODEL)
# Auto-size batches based on available GPU memory
batch_size = 16
if device == "cuda":
try:
vram_gb = torch.cuda.get_device_properties(0).total_mem / 1e9
if vram_gb >= 16:
batch_size = 64
elif vram_gb >= 8:
batch_size = 32
_log(f"audio_scan: batch_size={batch_size} (VRAM {vram_gb:.1f} GB)")
except Exception:
pass
timestamps = np.arange(n_windows) * hop
embeddings = []
for batch_start in range(0, n_windows, batch_size):
if cancel_flag and getattr(cancel_flag, '_cancel', False):
return np.array([]), np.empty((0, edim))
batch_end = min(batch_start + batch_size, n_windows)
chunks = []
for i in range(batch_start, batch_end):
start = i * hop_samples
chunks.append(y[start:start + win_samples])
with torch.no_grad():
if is_ast:
inputs = _ast_feature_extractor(
list(chunks), sampling_rate=sr, return_tensors="pt",
padding=True,
)
input_values = inputs.input_values.to(device)
if ml_cfg is not None:
out = model(input_values, output_hidden_states=True)
selected = [out.hidden_states[i].mean(dim=1) for i in ml_cfg[1]]
batch_emb = torch.cat(selected, dim=1).cpu().numpy()
else:
out = model(input_values)
batch_emb = out.last_hidden_state.mean(dim=1).cpu().numpy()
elif is_eat:
mel_input = _eat_preprocess(chunks, sr, device)
features = model.extract_features(mel_input)
batch_emb = features[:, 1:, :].mean(dim=1).cpu().numpy()
else:
waveforms = torch.from_numpy(np.stack(chunks)).float().to(device)
if is_beats:
padding_mask = torch.zeros_like(waveforms, dtype=torch.bool)
features, _ = model.extract_features(waveforms, padding_mask=padding_mask)
batch_emb = features.mean(dim=1).cpu().numpy()
elif ml_cfg is not None:
all_layers, _ = model.extract_features(waveforms)
selected = [all_layers[i].mean(dim=1) for i in ml_cfg[1]]
batch_emb = torch.cat(selected, dim=1).cpu().numpy()
else:
features, _ = model(waveforms)
batch_emb = features.mean(dim=1).cpu().numpy()
embeddings.append(batch_emb)
result_ts = timestamps
result_emb = np.vstack(embeddings)
# Save to cache
if cache_file:
try:
os.makedirs(_W2V_CACHE_DIR, exist_ok=True)
np.savez(cache_file, timestamps=result_ts, embeddings=result_emb)
_log(f"audio_scan: w2v cache saved ({cache_file})")
except Exception as e:
_log(f"audio_scan: cache write failed: {e}")
return result_ts, result_emb
def _extract_w2v_targeted(y: np.ndarray, sr: int, gt_intense: list[float],
gt_soft: list[float], tolerance: float = 12.0,
neg_margin: float = 120.0,
model_name: str | None = None,
gt_negative: list[float] | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Extract embeddings only near positives and distant negatives.
Returns (timestamps, embeddings, labels) where labels: 1=pos, -1=neg, 0=ambig.
"""
edim = _embed_dim(model_name)
duration = len(y) / sr
win_samples = int(_WINDOW * sr)
all_gt = list(gt_intense) + list(gt_soft)
# Positive windows: every second near intense markers
pos_times = set()
for gt in gt_intense:
for offset in range(-int(tolerance), int(tolerance) + 1):
t = gt + offset
if 0 <= t <= duration - _WINDOW:
pos_times.add(int(t))
# Manual negative windows: near explicit negative markers
manual_neg_times = set()
if gt_negative:
for gt in gt_negative:
for offset in range(-int(tolerance), int(tolerance) + 1):
t = gt + offset
if 0 <= t <= duration - _WINDOW:
manual_neg_times.add(int(t))
# Don't let manual negatives overlap with positives
manual_neg_times -= pos_times
# Auto negative windows: every 4s, far from any marker (skip if margin <= 0 or no markers)
neg_times = set()
if all_gt and neg_margin > 0:
for t in range(0, int(duration - _WINDOW), 4):
if min(abs(t - g) for g in all_gt) > neg_margin:
neg_times.add(t)
all_times = sorted(pos_times | neg_times | manual_neg_times)
# Filter out windows that go past the end
valid_times = [t for t in all_times if int(t * sr) + win_samples <= len(y)]
if not valid_times:
return np.array([]), np.zeros((0, edim)), np.array([], dtype=int)
import torch
model, device = _get_w2v_model(model_name)
batch_size = 16
timestamps_list: list[float] = []
embeddings_list: list[np.ndarray] = []
is_beats = (model_name or _DEFAULT_EMBED_MODEL) == "BEATS"
is_ast = (model_name or _DEFAULT_EMBED_MODEL) in ("AST", "AST_ML")
is_eat = (model_name or _DEFAULT_EMBED_MODEL) in ("EAT", "EAT_LARGE")
ml_cfg = _ml_config(model_name or _DEFAULT_EMBED_MODEL)
for batch_start in range(0, len(valid_times), batch_size):
batch_end = min(batch_start + batch_size, len(valid_times))
chunks = []
for t in valid_times[batch_start:batch_end]:
start = int(t * sr)
chunks.append(y[start:start + win_samples])
timestamps_list.append(float(t))
with torch.no_grad():
if is_ast:
inputs = _ast_feature_extractor(
list(chunks), sampling_rate=sr, return_tensors="pt",
padding=True,
)
input_values = inputs.input_values.to(device)
if ml_cfg is not None:
out = model(input_values, output_hidden_states=True)
selected = [out.hidden_states[i].mean(dim=1) for i in ml_cfg[1]]
batch_emb = torch.cat(selected, dim=1).cpu().numpy()
else:
out = model(input_values)
batch_emb = out.last_hidden_state.mean(dim=1).cpu().numpy()
elif is_eat:
mel_input = _eat_preprocess(chunks, sr, device)
features = model.extract_features(mel_input)
batch_emb = features[:, 1:, :].mean(dim=1).cpu().numpy()
else:
waveforms = torch.from_numpy(np.stack(chunks)).float().to(device)
if is_beats:
padding_mask = torch.zeros_like(waveforms, dtype=torch.bool)
features, _ = model.extract_features(waveforms, padding_mask=padding_mask)
batch_emb = features.mean(dim=1).cpu().numpy()
elif ml_cfg is not None:
all_layers, _ = model.extract_features(waveforms)
selected = [all_layers[i].mean(dim=1) for i in ml_cfg[1]]
batch_emb = torch.cat(selected, dim=1).cpu().numpy()
else:
features, _ = model(waveforms)
batch_emb = features.mean(dim=1).cpu().numpy()
embeddings_list.append(batch_emb)
timestamps = np.array(timestamps_list)
embeddings = np.vstack(embeddings_list)
labels = np.zeros(len(timestamps), dtype=int)
for i, t in enumerate(timestamps):
di = min((abs(t - g) for g in gt_intense), default=9999)
da = min((abs(t - g) for g in all_gt), default=9999)
dm = min((abs(t - g) for g in (gt_negative or [])), default=9999)
if di < tolerance:
labels[i] = 1
elif dm < tolerance or (neg_margin > 0 and da > neg_margin):
labels[i] = -1
return timestamps, embeddings, labels
# ---------------------------------------------------------------------------
# Classifier mode — train / save / load / scan
# ---------------------------------------------------------------------------
def train_classifier(video_infos: list[tuple[str, list[float], list[float]]],
model_path: str | None = None,
tolerance: float = 12.0,
neg_margin: float = 120.0,
embed_model: str | None = None,
cancel_flag: object = None,
n_workers: int = 4,
progress_cb: object = None) -> dict:
"""Train a classifier from labeled videos.
Args:
video_infos: list of (video_path, intense_times, soft_times)
model_path: if given, save model to this path
tolerance/neg_margin: labeling parameters
embed_model: embedding model name (e.g. "HUBERT_BASE", "BEATS"), defaults to WAV2VEC2_BASE
cancel_flag: object with _cancel attribute; if set, training aborts early
n_workers: number of threads for parallel audio loading
Returns:
dict with 'classifier', 'embed_model', and metadata, or None on failure.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from sklearn.ensemble import HistGradientBoostingClassifier
def _progress(msg: str) -> None:
_log(msg)
if progress_cb:
progress_cb(msg)
def _load_audio(path: str) -> np.ndarray:
return _load_audio_ffmpeg(path, sr=_SR)
# Phase 1: load all audio in parallel (cap workers — disk I/O bound)
n = len(video_infos)
load_workers = min(n_workers, 4)
_progress(f"Loading audio: 0/{n} videos ({load_workers} workers)...")
audio_data: dict[int, np.ndarray] = {}
with ThreadPoolExecutor(max_workers=load_workers) as pool:
future_to_idx = {
pool.submit(_load_audio, vi[0]): i
for i, vi in enumerate(video_infos)
}
failed = set()
for future in as_completed(future_to_idx):
if cancel_flag and getattr(cancel_flag, '_cancel', False):
_log("audio_scan: training cancelled")
return None
idx = future_to_idx[future]
try:
audio_data[idx] = future.result()
except Exception as e:
_log(f"audio_scan: failed to load {os.path.basename(video_infos[idx][0])}: {e}")
failed.add(idx)
_progress(f"Loading audio: {len(audio_data) + len(failed)}/{n}")
# Phase 2: extract embeddings sequentially on GPU
_progress(f"Extracting embeddings: 0/{n}")
all_X, all_y = [], []
for vi, vinfo in enumerate(video_infos):
if vi in failed:
continue
vpath, gt_intense, gt_soft = vinfo[0], vinfo[1], vinfo[2]
gt_negative = vinfo[3] if len(vinfo) > 3 else []
if cancel_flag and getattr(cancel_flag, '_cancel', False):
_log("audio_scan: training cancelled")
return None
_progress(f"Extracting embeddings: {vi+1}/{n}")
y = audio_data.pop(vi)
timestamps, embeddings, labels = _extract_w2v_targeted(
y, _SR, gt_intense, gt_soft, tolerance, neg_margin,
model_name=embed_model, gt_negative=gt_negative,
)
if len(timestamps) == 0:
continue
# Per-video z-score normalize
vid_mean = embeddings.mean(axis=0)
vid_std = np.maximum(embeddings.std(axis=0), 1e-6)
normed = (embeddings - vid_mean) / vid_std
for i in range(len(labels)):
if labels[i] == 1:
all_X.append(normed[i])
all_y.append(1)
elif labels[i] == -1:
all_X.append(normed[i])
all_y.append(0)
if not all_X:
_log("audio_scan: no training samples collected")
return None
X = np.stack(all_X)
y_arr = np.array(all_y)
n_pos = (y_arr == 1).sum()
n_neg = (y_arr == 0).sum()
_log(f"audio_scan: training set — {n_pos} positive, {n_neg} negative")
if n_pos == 0 or n_neg == 0:
_log(f"audio_scan: need both classes — {n_pos} pos, {n_neg} neg")
return None
# Subsample negatives for balance
rng = np.random.RandomState(42)
pos_idx = np.where(y_arr == 1)[0]
neg_idx = np.where(y_arr == 0)[0]
n_neg_sample = min(len(neg_idx), len(pos_idx) * 3)
neg_sample = rng.choice(neg_idx, n_neg_sample, replace=False)
train_idx = np.concatenate([pos_idx, neg_sample])
rng.shuffle(train_idx)
_progress(f"Fitting classifier on {len(train_idx)} samples...")
clf = HistGradientBoostingClassifier(
max_iter=200, max_depth=5, learning_rate=0.1, random_state=42,
)
clf.fit(X[train_idx], y_arr[train_idx])
_log("audio_scan: classifier trained")
# Calibrate probabilities for better threshold behavior
from sklearn.calibration import CalibratedClassifierCV
min_class = min(int(n_pos), int(n_neg_sample))
if min_class >= 6:
cal_clf = CalibratedClassifierCV(clf, cv=3, method='isotonic')
cal_clf.fit(X[train_idx], y_arr[train_idx])
clf = cal_clf
_log("audio_scan: classifier calibrated (isotonic, 3-fold)")
else:
_log(f"audio_scan: skipping calibration (min class size {min_class} < 6)")
model = {"classifier": clf, "n_features": X.shape[1],
"embed_model": embed_model or _DEFAULT_EMBED_MODEL}
if model_path:
import joblib
from datetime import datetime
parent = os.path.dirname(model_path)
if parent:
os.makedirs(parent, exist_ok=True)
# Save with timestamp in name; keep a symlink/copy as the "latest"
stem, ext = os.path.splitext(model_path)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
versioned = f"{stem}_{ts}{ext}"
joblib.dump(model, versioned)
_log(f"audio_scan: model saved to {versioned}")
# Update the base path to point to latest version (copy)
import shutil
shutil.copy2(versioned, model_path)
_log(f"audio_scan: latest model updated: {model_path}")
return model
def load_classifier(model_path: str) -> dict | None:
"""Load a saved classifier model."""
if not os.path.exists(model_path):
return None
import joblib
return joblib.load(model_path)
def default_model_path(profile_name: str = "default",
embed_model: str | None = None) -> str:
"""Return the path for a profile's classifier model.
When embed_model is given the file is ``{profile}_{model}.joblib``,
otherwise ``{profile}.joblib`` (legacy single-model layout).
"""
if embed_model:
return os.path.join(_MODEL_DIR, f"{profile_name}_{embed_model}.joblib")
return os.path.join(_MODEL_DIR, f"{profile_name}.joblib")
def list_model_versions(profile_name: str = "default",
embed_model: str | None = None) -> list[tuple[str, str]]:
"""Return available backup versions for a model, newest first.
Returns list of (timestamp_label, file_path).
The current (active) model is listed first as "current".
"""
import re
current = default_model_path(profile_name, embed_model)
stem, ext = os.path.splitext(current)
versions: list[tuple[str, str]] = []
if os.path.exists(current):
versions.append(("current", current))
if not os.path.isdir(_MODEL_DIR):
return versions
pattern = re.compile(re.escape(os.path.basename(stem)) + r"_(\d{8}_\d{6})" + re.escape(ext) + "$")
for fname in os.listdir(_MODEL_DIR):
m = pattern.match(fname)
if m:
versions.append((m.group(1), os.path.join(_MODEL_DIR, fname)))
# Sort backups newest first (after "current")
current_entry = versions[:1]
backups = sorted(versions[1:], key=lambda v: v[0], reverse=True)
return current_entry + backups
def restore_model_version(version_path: str, profile_name: str = "default",
embed_model: str | None = None) -> None:
"""Restore a backup version as the active model."""
import filecmp, shutil
from datetime import datetime
current = default_model_path(profile_name, embed_model)
if version_path == current:
return
# Back up current before replacing — but only if no identical backup exists
if os.path.exists(current):
stem, ext = os.path.splitext(current)
already_saved = False
if os.path.isdir(_MODEL_DIR):
import re
pat = re.compile(re.escape(os.path.basename(stem)) + r"_\d{8}_\d{6}" + re.escape(ext) + "$")
for fname in os.listdir(_MODEL_DIR):
if pat.match(fname):
candidate = os.path.join(_MODEL_DIR, fname)
if filecmp.cmp(current, candidate, shallow=False):
already_saved = True
break
if not already_saved:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
shutil.move(current, f"{stem}_{ts}{ext}")
shutil.copy2(version_path, current)
_log(f"audio_scan: restored {os.path.basename(version_path)} as active model")
def list_trained_models(profile_name: str = "default") -> list[str]:
"""Return embedding model keys that have a trained .joblib for *profile_name*.
Looks for files matching ``{profile}_{KEY}.joblib`` in the models dir.
KEY is either a bare embed model name (e.g. ``EAT_LARGE``) or
``{MODEL}_{name}`` for user-named variants.
"""
prefix = f"{profile_name}_"
suffix = ".joblib"
result = []
if not os.path.isdir(_MODEL_DIR):
return result
for fname in os.listdir(_MODEL_DIR):
if fname.startswith(prefix) and fname.endswith(suffix):
key = fname[len(prefix):-len(suffix)]
if key in _EMBED_MODELS:
result.append(key)
else:
for m in _EMBED_MODELS:
if key.startswith(m + "_"):
result.append(key)
break
# Also check legacy {profile}.joblib
legacy = os.path.join(_MODEL_DIR, f"{profile_name}.joblib")
if os.path.exists(legacy) and not result:
result.append("")
return sorted(result)
# ---------------------------------------------------------------------------
# Scanning
# ---------------------------------------------------------------------------
def _fuse_regions(regions: list[tuple[float, float, float]]
) -> list[tuple[float, float, float]]:
"""Merge overlapping/adjacent regions, keeping max score."""
if not regions:
return []
by_start = sorted(regions, key=lambda r: r[0])
fused: list[tuple[float, float, float]] = []
s, e, sc = by_start[0]
for s2, e2, sc2 in by_start[1:]:
if s2 <= e: # overlapping or touching
e = max(e, e2)
sc = max(sc, sc2)
else:
fused.append((s, e, sc))
s, e, sc = s2, e2, sc2
fused.append((s, e, sc))
return fused
def prefetch_audio(video_path: str, embed_model: str | None = None,
hop: float = 1.0, window: float = _WINDOW) -> np.ndarray | None:
"""Pre-load audio for a video if embeddings aren't cached.
Returns the raw audio array, or None if cache already exists.
Call from a background thread while the GPU is busy with another video.
"""
if _w2v_cache_exists(video_path, hop, window, embed_model):
return None
_log(f"audio_scan: prefetching {os.path.basename(video_path)}")
y = _load_audio_ffmpeg(video_path, sr=_SR)
_log(f"audio_scan: prefetched {len(y)/_SR:.1f}s")
return y
def scan_video(
video_path: str,
model: dict = None,
threshold: float = 0.50,
hop: float = 1.0,
window: float = _WINDOW,
cancel_flag: object = None,
prefetched_audio: np.ndarray | None = None,
) -> list[tuple[float, float, float]]:
"""Scan a video for matching audio regions using a trained classifier.
Returns list of (start_time, end_time, score) above threshold.
If prefetched_audio is provided, skips the ffmpeg decode step.
"""
if model is None:
_log("audio_scan: no model provided")
return []
clf = model["classifier"]
embed_model = model.get("embed_model")
# Try cache first — skip expensive audio loading if embeddings exist
cached = _w2v_cache_load(video_path, hop, window, embed_model)
if cached is not None:
timestamps, window_vectors = cached
else:
if prefetched_audio is not None:
_log(f"audio_scan: using prefetched audio")
y = prefetched_audio
else:
_log(f"audio_scan: loading {video_path}")
y = _load_audio_ffmpeg(video_path, sr=_SR)
sr = _SR
_log(f"audio_scan: {len(y)/sr:.1f}s loaded")
if cancel_flag and getattr(cancel_flag, '_cancel', False):
return []
_log(f"audio_scan: extracting embeddings ({embed_model or 'default'})...")
timestamps, window_vectors = _extract_w2v_windows(
y, sr, hop=hop, window=window, video_path=video_path,
cancel_flag=cancel_flag, model_name=embed_model,
)
if len(timestamps) == 0:
_log("audio_scan: video shorter than window")
return []
# Per-video z-score normalize
vid_mean = window_vectors.mean(axis=0)
vid_std = np.maximum(window_vectors.std(axis=0), 1e-6)
normed = (window_vectors - vid_mean) / vid_std
_log(f"audio_scan: classifying {len(normed)} windows...")
if cancel_flag and getattr(cancel_flag, '_cancel', False):
return []
probs = clf.predict_proba(normed)[:, 1]
mask = probs >= threshold
raw = [
(timestamps[i], timestamps[i] + window, float(probs[i]))
for i in np.nonzero(mask)[0]
]
results = _fuse_regions(raw)
_log(f"audio_scan: {len(results)} regions above threshold {threshold} (from {len(raw)} raw)")
return results