Covers scan result versioning per model, hard negative management dialog with training toggle, and ghost folder fix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
24 KiB
Scan History & Hard Negative Management Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add scan result versioning, hard negative management dialog with training toggle, and fix ghost folder bug.
Architecture: DB schema changes in core/db.py (new columns, new queries). UI changes in main.py (version selector in ScanResultsPanel, management dialog, training toggle). No changes to core/audio_scan.py.
Tech Stack: SQLite (existing), PyQt6 (existing)
Key design notes:
- Scan history stores N versions per
(filename, profile, model)using ascan_timestampcolumn. All rows from one scan share the same timestamp. - Hard negatives gain a
source_modelcolumn (informational) and training gains ause_hard_negativestoggle. get_export_folders()must respectscan_exportfilter to prevent ghost folders.
Task 1: Fix ghost folder bug in get_export_folders
Files:
- Modify:
core/db.py:294-313(get_export_folders) - Modify:
core/db.py:410-443(get_training_stats — filter out 0-clip folders) - Test:
tests/test_db.py
Step 1: Write failing test
def test_export_folders_excludes_scan_exports():
"""Scan-export-only folders should not appear when include_scan_exports=False."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
db = ProcessedDB(path)
# Manual export
db.add("a.mp4", 10.0, "/out/mp4_Intense/g1/clip.mp4", profile="test")
# Scan export to different folder
db.add("a.mp4", 20.0, "/out/mp4_ScanOnly/g1/clip.mp4", profile="test",
scan_export=True)
folders = db.get_export_folders("test")
assert "mp4_Intense" in folders
assert "mp4_ScanOnly" not in folders, "scan-only folder should be excluded"
# With include_scan_exports=True, both should appear
folders_all = db.get_export_folders("test", include_scan_exports=True)
assert "mp4_ScanOnly" in folders_all
finally:
os.unlink(path)
Step 2: Fix get_export_folders
Add include_scan_exports parameter:
def get_export_folders(self, profile: str = "default",
include_scan_exports: bool = False) -> list[str]:
if not self._enabled:
return []
if include_scan_exports:
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed"
" WHERE profile = ? AND scan_export = 0",
(profile,),
).fetchall()
folder_names: set[str] = set()
for (op,) in rows:
grandparent = os.path.basename(os.path.dirname(os.path.dirname(op)))
if grandparent:
folder_names.add(grandparent)
return sorted(folder_names)
Step 3: Update get_training_stats to pass through
folders = self.get_export_folders(profile, include_scan_exports=include_scan_exports)
And filter out empty folders at the end:
return {k: v for k, v in stats.items() if v["clips"] > 0}
Step 4: Run tests, commit
pytest tests/ -v
git add core/db.py tests/test_db.py
git commit -m "fix: get_export_folders respects scan_export filter"
Task 2: Scan result history — schema and DB methods
Files:
- Modify:
core/db.py:86-98(scan_results schema — add scan_timestamp column) - Modify:
core/db.py:100-113(migration — add scan_timestamp to existing tables) - Modify:
core/db.py:447-468(save_scan_results — version management) - Add:
core/db.py(get_scan_versions, load_scan_version, delete_scan_version) - Test:
tests/test_db.py
Step 1: Write failing test
def test_scan_result_history():
"""save_scan_results should keep multiple versions."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
db = ProcessedDB(path)
# Save three versions
db.save_scan_results("v.mp4", "test", "MODEL_A",
[(0, 8, 0.9)])
db.save_scan_results("v.mp4", "test", "MODEL_A",
[(0, 8, 0.8), (10, 18, 0.7)])
db.save_scan_results("v.mp4", "test", "MODEL_A",
[(5, 13, 0.95)])
versions = db.get_scan_versions("v.mp4", "test", "MODEL_A")
assert len(versions) == 3
# Most recent first
assert versions[0]["count"] == 1 # latest: 1 region
assert versions[1]["count"] == 2 # middle: 2 regions
assert versions[2]["count"] == 1 # oldest: 1 region
# get_scan_results returns latest version by default
results = db.get_scan_results("v.mp4", "test")
assert len(results.get("MODEL_A", [])) == 1
finally:
os.unlink(path)
Step 2: Add scan_timestamp column
In the CREATE TABLE (line 87-98), add:
scan_timestamp TEXT NOT NULL DEFAULT ''
In the migration block (lines 100-113), add:
("scan_timestamp", "TEXT NOT NULL DEFAULT ''"),
Step 3: Modify save_scan_results
Replace the current DELETE+INSERT with versioned insert + cleanup:
def save_scan_results(self, filename: str, profile: str, model: str,
regions: list[tuple[float, float, float]],
max_versions: int = 5) -> None:
if not self._enabled:
return
from datetime import datetime
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
with self._lock:
self._con.executemany(
"INSERT INTO scan_results"
" (filename, profile, model, start_time, end_time, score,"
" orig_start_time, orig_end_time, scan_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
[(filename, profile, model, s, e, sc, s, e, ts)
for s, e, sc in regions],
)
# Prune old versions beyond max_versions
versions = self._con.execute(
"SELECT DISTINCT scan_timestamp FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
" ORDER BY scan_timestamp DESC",
(filename, profile, model),
).fetchall()
if len(versions) > max_versions:
old_ts = [v[0] for v in versions[max_versions:]]
self._con.execute(
"DELETE FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
f" AND scan_timestamp IN ({','.join('?' * len(old_ts))})",
(filename, profile, model, *old_ts),
)
self._con.commit()
Step 4: Add get_scan_versions
def get_scan_versions(self, filename: str, profile: str, model: str
) -> list[dict]:
"""Return list of scan versions for (filename, profile, model).
Returns [{timestamp, count, max_score}, ...] ordered newest first.
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT scan_timestamp, COUNT(*), MAX(score)"
" FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
" AND scan_timestamp != ''"
" GROUP BY scan_timestamp"
" ORDER BY scan_timestamp DESC",
(filename, profile, model),
).fetchall()
return [{"timestamp": ts, "count": cnt, "max_score": sc}
for ts, cnt, sc in rows]
Step 5: Modify get_scan_results to support version selection
Add optional scan_timestamp parameter. When None (default), returns latest version:
def get_scan_results(self, filename: str, profile: str,
scan_timestamp: str | None = None
) -> dict[str, list[tuple]]:
if not self._enabled:
return {}
if scan_timestamp:
rows = self._con.execute(
"SELECT id, model, start_time, end_time, score, disabled,"
" orig_start_time, orig_end_time"
" FROM scan_results"
" WHERE filename = ? AND profile = ? AND scan_timestamp = ?"
" ORDER BY model, start_time",
(filename, profile, scan_timestamp),
).fetchall()
else:
# For each model, get rows from the latest timestamp only
rows = self._con.execute(
"SELECT r.id, r.model, r.start_time, r.end_time, r.score,"
" r.disabled, r.orig_start_time, r.orig_end_time"
" FROM scan_results r"
" INNER JOIN ("
" SELECT model, MAX(scan_timestamp) AS latest"
" FROM scan_results"
" WHERE filename = ? AND profile = ?"
" GROUP BY model"
" ) m ON r.model = m.model AND r.scan_timestamp = m.latest"
" WHERE r.filename = ? AND r.profile = ?"
" ORDER BY r.model, r.start_time",
(filename, profile, filename, profile),
).fetchall()
result: dict[str, list] = {}
for row_id, model, s, e, sc, dis, os_, oe in rows:
result.setdefault(model, []).append(
(row_id, s, e, sc, bool(dis),
os_ if os_ is not None else s,
oe if oe is not None else e))
return result
Important: Legacy rows (before this change) have scan_timestamp = ''. The MAX(scan_timestamp) query handles this correctly — empty string sorts before any real timestamp, so legacy rows are returned when they're the only version. The get_scan_versions query filters scan_timestamp != '' so legacy rows don't appear as named versions.
Step 6: Run tests, commit
pytest tests/ -v
git add core/db.py tests/test_db.py
git commit -m "feat: scan result history — keep N versions per (file, model)"
Task 3: Scan history UI — version selector in ScanResultsPanel
Files:
- Modify:
main.py(ScanResultsPanel — add version combo per tab) - Modify:
main.py(ScanResultsPanel.load_for_file — populate versions)
Step 1: Add version combo to tab UI
In ScanResultsPanel._add_tab(), add a small QComboBox above the table. When no history exists, hide it. When versions exist, populate with timestamps and connect to a slot that reloads the tab with that version.
# In _add_tab, create a container widget with version combo + table
container = QWidget()
layout = QVBoxLayout(container)
layout.setContentsMargins(0, 0, 0, 0)
cmb_version = QComboBox()
cmb_version.setMaximumWidth(200)
cmb_version.setToolTip("Scan version history")
cmb_version.hide() # Hidden when only 1 version
layout.addWidget(cmb_version)
layout.addWidget(table)
self._tabs.addTab(container, label)
Store the combo and table as properties on the container widget for later access.
Step 2: Populate versions in load_for_file
After creating each model tab, query get_scan_versions(). If > 1 version, show the combo with entries like "2026-04-19 14:30 (12 regions, best: 0.95)". Connect currentIndexChanged to reload that version's results.
Step 3: Version switching slot
When user selects a different version from the combo:
- Call
db.get_scan_results(filename, profile, scan_timestamp=selected_ts) - Repopulate the table with that version's rows
- Update timeline regions
Step 4: Test manually, commit
git add main.py
git commit -m "feat: scan version selector in results panel"
Task 4: Hard negatives — schema and training toggle
Files:
- Modify:
core/db.py:118-130(hard_negatives schema — add source_model column) - Modify:
core/db.py:548-560(add_hard_negatives — accept source_model) - Modify:
core/db.py:365-374(get_training_data — use_hard_negatives parameter) - Modify:
main.py(TrainDialog — add "Use hard negatives" checkbox) - Modify:
main.py(_open_train_dialog — pass use_hard_negatives to get_training_data) - Test:
tests/test_db.py
Step 1: Write failing test
def test_hard_negatives_source_model():
"""Hard negatives should store source_model."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
db = ProcessedDB(path)
db.add_hard_negatives("a.mp4", "test", [10.0, 20.0],
source_path="/a.mp4", source_model="HUBERT_XLARGE")
rows = db.get_hard_negatives("test")
assert len(rows) == 2
assert all(r["source_model"] == "HUBERT_XLARGE" for r in rows)
finally:
os.unlink(path)
def test_training_data_skips_hard_negatives():
"""get_training_data with use_hard_negatives=False should skip them."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
db = ProcessedDB(path)
db.add("a.mp4", 10.0, "/out/folder/g/clip.mp4", profile="test",
source_path="/videos/a.mp4")
db.add_hard_negatives("a.mp4", "test", [500.0], source_path="/videos/a.mp4")
# With hard negatives
data_with = db.get_training_data("test", "folder", use_hard_negatives=True)
# Without hard negatives
data_without = db.get_training_data("test", "folder", use_hard_negatives=False)
# Both should find the video, but negative counts differ
assert len(data_with) >= 1
neg_with = sum(len(vi[3]) for vi in data_with)
neg_without = sum(len(vi[3]) for vi in data_without)
assert neg_with > neg_without or neg_with == neg_without # depends on margin
finally:
os.unlink(path)
Step 2: Add source_model column to hard_negatives
In CREATE TABLE (line 119-125), add:
source_model TEXT NOT NULL DEFAULT ''
In migration section, add after the hard_negatives table creation:
hn_cols = {
row[1]
for row in self._con.execute("PRAGMA table_info(hard_negatives)").fetchall()
}
if "source_model" not in hn_cols:
self._con.execute(
"ALTER TABLE hard_negatives ADD COLUMN source_model TEXT NOT NULL DEFAULT ''"
)
Step 3: Update add_hard_negatives to accept source_model
def add_hard_negatives(self, filename: str, profile: str,
times: list[float], source_path: str = "",
source_model: str = "") -> None:
if not self._enabled or not times:
return
with self._lock:
for t in times:
self._con.execute(
"INSERT INTO hard_negatives"
" (filename, profile, start_time, source_path, source_model)"
" VALUES (?, ?, ?, ?, ?)",
(filename, profile, t, source_path, source_model),
)
self._con.commit()
Step 4: Add get_hard_negatives (full rows for management dialog)
def get_hard_negatives(self, profile: str) -> list[dict]:
"""Return all hard negatives for a profile with full details."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT id, filename, start_time, source_path, source_model"
" FROM hard_negatives WHERE profile = ?"
" ORDER BY filename, start_time",
(profile,),
).fetchall()
return [{"id": r[0], "filename": r[1], "start_time": r[2],
"source_path": r[3], "source_model": r[4]} for r in rows]
Step 5: Add delete_hard_negatives_by_ids
def delete_hard_negatives_by_ids(self, ids: list[int]) -> None:
"""Delete hard negatives by row IDs."""
if not self._enabled or not ids:
return
with self._lock:
self._con.execute(
f"DELETE FROM hard_negatives WHERE id IN ({','.join('?' * len(ids))})",
ids,
)
self._con.commit()
Step 6: Add use_hard_negatives parameter to get_training_data
In get_training_data() (line 315), add parameter:
def get_training_data(self, profile: str, positive_folder: str,
negative_folder: str = "",
fallback_video_dir: str = "",
include_scan_exports: bool = False,
use_hard_negatives: bool = True,
) -> list[tuple[str, list[float], list[float], list[float]]]:
Then wrap the hard negatives query (lines 365-374) in a conditional:
if use_hard_negatives:
hard_rows = self._con.execute(
"SELECT filename, start_time, source_path FROM hard_negatives"
" WHERE profile = ?",
(profile,),
).fetchall()
for fn, st, sp in hard_rows:
neg_by_video.setdefault(fn, set()).add(st)
if sp:
source_by_filename.setdefault(fn, sp)
Step 7: Pass source_model when marking negatives from scan panel
In main.py, _on_scan_negatives() needs to pass the current scan model. The scan panel knows which tab is active:
def _on_scan_negatives(self, times: list) -> None:
if not self._file_path:
return
filename = os.path.basename(self._file_path)
# Get current model tab name for source_model
source_model = self._scan_panel.current_model_name()
self._db.add_hard_negatives(filename, self._profile, times,
source_path=self._file_path,
source_model=source_model)
Add current_model_name() to ScanResultsPanel:
def current_model_name(self) -> str:
"""Return the model name of the currently active tab."""
idx = self._tabs.currentIndex()
if idx >= 0:
return self._tabs.tabText(idx).split(" (")[0] # strip count suffix
return ""
Step 8: Add training toggle to TrainDialog
After the existing _chk_scan_exports checkbox:
self._chk_hard_negatives = QCheckBox("Use hard negatives in training")
self._chk_hard_negatives.setChecked(True)
self._chk_hard_negatives.setToolTip(
"When unchecked, manually marked hard negatives are excluded from training.\n"
"Useful when training a new model type where old negatives may not apply.")
self._chk_hard_negatives.stateChanged.connect(lambda: self._debounce.start())
form.addRow("", self._chk_hard_negatives)
Add property:
@property
def use_hard_negatives(self) -> bool:
return self._chk_hard_negatives.isChecked()
Step 9: Wire toggle through _open_train_dialog
In _open_train_dialog(), pass the flag:
video_infos = self._db.get_training_data(
self._profile, pos_folder, negative_folder=neg_folder,
fallback_video_dir=video_dir,
include_scan_exports=inc_scan,
use_hard_negatives=dlg.use_hard_negatives,
)
Also update _update_stats() in TrainDialog to pass it through for accurate counts:
use_neg = self._chk_hard_negatives.isChecked() if hasattr(self, '_chk_hard_negatives') else True
video_infos = self._db.get_training_data(
self._profile, folder, negative_folder=neg_folder,
fallback_video_dir=self._txt_video_dir.text(),
include_scan_exports=inc_scan,
use_hard_negatives=use_neg,
)
Step 10: Run tests, commit
pytest tests/ -v
git add core/db.py main.py tests/test_db.py
git commit -m "feat: hard negative source_model tracking, training toggle"
Task 5: Hard negatives management dialog
Files:
- Modify:
main.py(add HardNegativesDialog class) - Modify:
main.py(TrainDialog — add "Manage..." button)
Step 1: Create HardNegativesDialog
Place before TrainDialog class:
class HardNegativesDialog(QDialog):
"""View and manage hard negative training examples."""
def __init__(self, db: ProcessedDB, profile: str, parent=None):
super().__init__(parent)
self.setWindowTitle("Hard Negatives")
self.setMinimumSize(600, 400)
self._db = db
self._profile = profile
layout = QVBoxLayout(self)
# Filter row
filter_row = QHBoxLayout()
filter_row.addWidget(QLabel("Filter model:"))
self._cmb_filter = QComboBox()
self._cmb_filter.addItem("(all)")
self._cmb_filter.currentIndexChanged.connect(self._apply_filter)
filter_row.addWidget(self._cmb_filter, 1)
layout.addLayout(filter_row)
# Summary
self._lbl_summary = QLabel()
layout.addWidget(self._lbl_summary)
# Table
self._table = QTableWidget(0, 4)
self._table.setHorizontalHeaderLabels(
["File", "Time", "Source Model", "ID"])
self._table.horizontalHeader().setSectionResizeMode(
0, QHeaderView.ResizeMode.Stretch)
self._table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
self._table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self._table.setColumnHidden(3, True) # hide ID column
layout.addWidget(self._table)
# Buttons
btn_row = QHBoxLayout()
btn_delete = QPushButton("Delete Selected")
btn_delete.clicked.connect(self._delete_selected)
btn_row.addWidget(btn_delete)
btn_clear = QPushButton("Clear All")
btn_clear.clicked.connect(self._clear_all)
btn_row.addWidget(btn_clear)
btn_row.addStretch()
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.close)
btn_row.addWidget(btn_close)
layout.addLayout(btn_row)
self._load()
def _load(self):
rows = self._db.get_hard_negatives(self._profile)
models = sorted(set(r["source_model"] for r in rows if r["source_model"]))
self._cmb_filter.blockSignals(True)
self._cmb_filter.clear()
self._cmb_filter.addItem("(all)")
for m in models:
self._cmb_filter.addItem(m)
self._cmb_filter.blockSignals(False)
self._table.setRowCount(len(rows))
for i, r in enumerate(rows):
self._table.setItem(i, 0, QTableWidgetItem(r["filename"]))
self._table.setItem(i, 1, QTableWidgetItem(f'{r["start_time"]:.1f}s'))
self._table.setItem(i, 2, QTableWidgetItem(r["source_model"]))
item = QTableWidgetItem(str(r["id"]))
self._table.setItem(i, 3, item)
self._lbl_summary.setText(f"<b>{len(rows)}</b> hard negatives")
def _apply_filter(self):
model = self._cmb_filter.currentText()
for row in range(self._table.rowCount()):
if model == "(all)":
self._table.setRowHidden(row, False)
else:
src = self._table.item(row, 2).text()
self._table.setRowHidden(row, src != model)
def _delete_selected(self):
ids = []
for row in sorted(set(i.row() for i in self._table.selectedItems()), reverse=True):
if not self._table.isRowHidden(row):
ids.append(int(self._table.item(row, 3).text()))
if ids:
self._db.delete_hard_negatives_by_ids(ids)
self._load()
def _clear_all(self):
reply = QMessageBox.question(
self, "Clear All",
f"Delete all hard negatives for profile '{self._profile}'?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
)
if reply == QMessageBox.StandardButton.Yes:
all_rows = self._db.get_hard_negatives(self._profile)
self._db.delete_hard_negatives_by_ids([r["id"] for r in all_rows])
self._load()
Step 2: Add "Manage..." button to TrainDialog
After the hard negatives checkbox, add a button:
neg_row = QHBoxLayout()
neg_row.addWidget(self._chk_hard_negatives)
btn_manage_neg = QPushButton("Manage…")
btn_manage_neg.setFixedWidth(80)
btn_manage_neg.clicked.connect(self._manage_negatives)
neg_row.addWidget(btn_manage_neg)
form.addRow("", neg_row) # replaces the standalone checkbox addRow
Add handler:
def _manage_negatives(self):
dlg = HardNegativesDialog(self._db, self._profile, parent=self)
dlg.exec()
self._debounce.start() # refresh stats after potential deletions
Step 3: Test manually, commit
pytest tests/ -v
git add main.py
git commit -m "feat: hard negatives management dialog with filter and bulk delete"
Task 6: Final integration test and push
Step 1: Manual test checklist
- Open Train dialog — verify no ghost folders appear
- Train with "Use hard negatives" unchecked — verify training works
- Train with "Use hard negatives" checked — verify negatives are used
- Open Manage dialog — verify negatives listed with source model
- Delete selected negatives — verify they're removed
- Scan a video — verify results saved with timestamp
- Rescan same video — verify version history appears
- Switch version in scan panel — verify correct results display
- Mark negative from scan results — verify source_model stored
Step 2: Push
git push