feat: scan workflow — region fusion, hard negatives, review mode, versioned models

- Fuse overlapping scan regions before display (merge adjacent 1s-hop windows)
- Hard negatives: mark false positives from scan panel for training feedback
  - Toggle with "Add to Negatives" button, red text + red timeline regions
  - Stored in dedicated hard_negatives table, always included in training
- Model versioning: auto-backup on retrain, right-click model combo to rollback
- Scan review mode: "Review" toggle hides spread/markers for free navigation
- Scan exports: saved to DB with scan_export flag, no timeline markers
  - Training dialog checkbox to optionally include scan exports
  - Single group folder per batch with area numbering (clip_042_a1_0.mp4)
- Export scan results: skip negatives, skip regions < 8s, respect spread
  - Button shows estimated clip count, updates on spread/fuse/negative changes
- Timeline: reload scan regions on file load, "Clear all markers" context menu
- Default training model changed to HUBERT_XLARGE

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 18:43:05 +02:00
parent 5a9e068903
commit b161412d94
3 changed files with 451 additions and 95 deletions
+73 -2
View File
@@ -425,6 +425,14 @@ def train_classifier(video_infos: list[tuple[str, list[float], list[float]]],
parent = os.path.dirname(model_path)
if parent:
os.makedirs(parent, exist_ok=True)
# Version backup: keep previous model before overwriting
if os.path.exists(model_path):
from datetime import datetime
stem, ext = os.path.splitext(model_path)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
backup = f"{stem}_{ts}{ext}"
os.rename(model_path, backup)
_log(f"audio_scan: previous model backed up to {os.path.basename(backup)}")
joblib.dump(model, model_path)
_log(f"audio_scan: model saved to {model_path}")
@@ -451,6 +459,49 @@ def default_model_path(profile_name: str = "default",
return os.path.join(_MODEL_DIR, f"{profile_name}.joblib")
def list_model_versions(profile_name: str = "default",
embed_model: str | None = None) -> list[tuple[str, str]]:
"""Return available backup versions for a model, newest first.
Returns list of (timestamp_label, file_path).
The current (active) model is listed first as "current".
"""
import re
current = default_model_path(profile_name, embed_model)
stem, ext = os.path.splitext(current)
versions: list[tuple[str, str]] = []
if os.path.exists(current):
versions.append(("current", current))
if not os.path.isdir(_MODEL_DIR):
return versions
pattern = re.compile(re.escape(os.path.basename(stem)) + r"_(\d{8}_\d{6})" + re.escape(ext) + "$")
for fname in os.listdir(_MODEL_DIR):
m = pattern.match(fname)
if m:
versions.append((m.group(1), os.path.join(_MODEL_DIR, fname)))
# Sort backups newest first (after "current")
current_entry = versions[:1]
backups = sorted(versions[1:], key=lambda v: v[0], reverse=True)
return current_entry + backups
def restore_model_version(version_path: str, profile_name: str = "default",
embed_model: str | None = None) -> None:
"""Restore a backup version as the active model."""
from datetime import datetime
current = default_model_path(profile_name, embed_model)
if version_path == current:
return
# Back up current before replacing
if os.path.exists(current):
stem, ext = os.path.splitext(current)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
os.rename(current, f"{stem}_{ts}{ext}")
import shutil
shutil.copy2(version_path, current)
_log(f"audio_scan: restored {os.path.basename(version_path)} as active model")
def list_trained_models(profile_name: str = "default") -> list[str]:
"""Return embedding model names that have a trained .joblib for *profile_name*.
@@ -478,6 +529,25 @@ def list_trained_models(profile_name: str = "default") -> list[str]:
# Scanning
# ---------------------------------------------------------------------------
def _fuse_regions(regions: list[tuple[float, float, float]]
) -> list[tuple[float, float, float]]:
"""Merge overlapping/adjacent regions, keeping max score."""
if not regions:
return []
by_start = sorted(regions, key=lambda r: r[0])
fused: list[tuple[float, float, float]] = []
s, e, sc = by_start[0]
for s2, e2, sc2 in by_start[1:]:
if s2 <= e: # overlapping or touching
e = max(e, e2)
sc = max(sc, sc2)
else:
fused.append((s, e, sc))
s, e, sc = s2, e2, sc2
fused.append((s, e, sc))
return fused
def scan_video(
video_path: str,
model: dict = None,
@@ -532,9 +602,10 @@ def scan_video(
probs = clf.predict_proba(normed)[:, 1]
mask = probs >= threshold
results = [
raw = [
(timestamps[i], timestamps[i] + window, float(probs[i]))
for i in np.nonzero(mask)[0]
]
_log(f"audio_scan: {len(results)} regions above threshold {threshold}")
results = _fuse_regions(raw)
_log(f"audio_scan: {len(results)} regions above threshold {threshold} (from {len(raw)} raw)")
return results
+85 -9
View File
@@ -65,6 +65,7 @@ class ProcessedDB:
"spread": "REAL NOT NULL DEFAULT 3.0",
"profile": "TEXT NOT NULL DEFAULT 'default'",
"source_path": "TEXT NOT NULL DEFAULT ''",
"scan_export": "INTEGER NOT NULL DEFAULT 0",
}
for col, typedef in new_cols.items():
if col not in cols:
@@ -96,6 +97,19 @@ class ProcessedDB:
"CREATE INDEX IF NOT EXISTS idx_scan_file_profile_model"
" ON scan_results(filename, profile, model)"
)
self._con.execute(
"CREATE TABLE IF NOT EXISTS hard_negatives ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" filename TEXT NOT NULL,"
" profile TEXT NOT NULL DEFAULT 'default',"
" start_time REAL NOT NULL,"
" source_path TEXT NOT NULL DEFAULT ''"
")"
)
self._con.execute(
"CREATE INDEX IF NOT EXISTS idx_hardneg_file_profile"
" ON hard_negatives(filename, profile)"
)
self._con.commit()
def add(self, filename: str, start_time: float, output_path: str,
@@ -103,7 +117,8 @@ class ProcessedDB:
short_side: int | None = None, portrait_ratio: str = "",
crop_center: float = 0.5, fmt: str = "MP4",
clip_count: int = 3, spread: float = 3.0,
profile: str = "default", source_path: str = "") -> None:
profile: str = "default", source_path: str = "",
scan_export: bool = False) -> None:
if not self._enabled:
return
with self._lock:
@@ -111,11 +126,12 @@ class ProcessedDB:
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, spread, profile, source_path, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
" clip_count, spread, profile, source_path, scan_export, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, start_time, output_path, label, category,
short_side, portrait_ratio, crop_center, fmt,
clip_count, spread, profile, source_path,
1 if scan_export else 0,
datetime.now(timezone.utc).isoformat()),
)
self._con.commit()
@@ -207,7 +223,8 @@ class ProcessedDB:
def _get_markers_for(self, match: str, profile: str = "default") -> list[tuple[float, int, str]]:
rows = self._con.execute(
"SELECT start_time, output_path FROM processed"
" WHERE filename = ? AND profile = ? ORDER BY start_time",
" WHERE filename = ? AND profile = ? AND scan_export = 0"
" ORDER BY start_time",
(match, profile),
).fetchall()
# Deduplicate by start_time — batch exports share the same cursor.
@@ -269,6 +286,7 @@ class ProcessedDB:
def get_training_data(self, profile: str, positive_folder: str,
negative_folder: str = "",
fallback_video_dir: str = "",
include_scan_exports: bool = False,
) -> list[tuple[str, list[float], list[float], list[float]]]:
"""Build training video_infos from DB data.
@@ -277,6 +295,7 @@ class ProcessedDB:
positive_folder: export folder name for positive class (e.g. "mp4_Intense")
negative_folder: export folder name for explicit negatives (optional)
fallback_video_dir: if source_path is empty, try filename in this dir
include_scan_exports: if True, include auto-exported scan clips
Returns:
list of (source_video_path, positive_times, soft_times, negative_times)
@@ -284,11 +303,18 @@ class ProcessedDB:
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT filename, start_time, output_path, source_path"
" FROM processed WHERE profile = ?",
(profile,),
).fetchall()
if include_scan_exports:
rows = self._con.execute(
"SELECT filename, start_time, output_path, source_path"
" FROM processed WHERE profile = ?",
(profile,),
).fetchall()
else:
rows = self._con.execute(
"SELECT filename, start_time, output_path, source_path"
" FROM processed WHERE profile = ? AND scan_export = 0",
(profile,),
).fetchall()
# Collect times by video, split by folder role
pos_by_video: dict[str, set[float]] = {}
@@ -307,6 +333,17 @@ class ProcessedDB:
else:
soft_by_video.setdefault(fn, set()).add(st)
# Include hard negatives from scan feedback
hard_rows = self._con.execute(
"SELECT filename, start_time, source_path FROM hard_negatives"
" WHERE profile = ?",
(profile,),
).fetchall()
for fn, st, sp in hard_rows:
neg_by_video.setdefault(fn, set()).add(st)
if sp:
source_by_filename.setdefault(fn, sp)
# Remove positive times from soft/neg to avoid conflicting labels
for fn in pos_by_video:
if fn in soft_by_video:
@@ -442,6 +479,45 @@ class ProcessedDB:
).fetchall()
return {r[0] for r in rows}
def add_hard_negatives(self, filename: str, profile: str,
times: list[float], source_path: str = "") -> None:
"""Save timestamps as hard-negative training examples."""
if not self._enabled or not times:
return
with self._lock:
for t in times:
self._con.execute(
"INSERT INTO hard_negatives (filename, profile, start_time, source_path)"
" VALUES (?, ?, ?, ?)",
(filename, profile, t, source_path),
)
self._con.commit()
def get_hard_negative_times(self, filename: str, profile: str) -> set[float]:
"""Return start_times marked as hard negatives for this file."""
if not self._enabled:
return set()
rows = self._con.execute(
"SELECT start_time FROM hard_negatives"
" WHERE filename = ? AND profile = ?",
(filename, profile),
).fetchall()
return {r[0] for r in rows}
def remove_hard_negatives(self, filename: str, profile: str,
times: list[float]) -> None:
"""Remove specific hard-negative timestamps."""
if not self._enabled or not times:
return
with self._lock:
for t in times:
self._con.execute(
"DELETE FROM hard_negatives"
" WHERE filename = ? AND profile = ? AND start_time = ?",
(filename, profile, t),
)
self._con.commit()
def get_training_filenames(self, profile: str) -> set[str]:
"""Return filenames used in training (have exported clips)."""
if not self._enabled: