feat: scan result history — keep N versions per (file, model)

Add scan_timestamp column to scan_results. save_scan_results now inserts
with a timestamp and prunes versions beyond max_versions (default 5).
get_scan_results returns only the latest version by default, with optional
scan_timestamp parameter for loading specific versions. New get_scan_versions
method returns available versions for a (file, profile, model) tuple.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-19 15:18:28 +02:00
parent 2614a765d5
commit 4fb2ae144f
2 changed files with 104 additions and 20 deletions
+71 -14
View File
@@ -94,7 +94,8 @@ class ProcessedDB:
" score REAL NOT NULL," " score REAL NOT NULL,"
" disabled INTEGER NOT NULL DEFAULT 0," " disabled INTEGER NOT NULL DEFAULT 0,"
" orig_start_time REAL," " orig_start_time REAL,"
" orig_end_time REAL" " orig_end_time REAL,"
" scan_timestamp TEXT NOT NULL DEFAULT ''"
")" ")"
) )
# Migrate: add new columns to existing scan_results tables # Migrate: add new columns to existing scan_results tables
@@ -106,6 +107,7 @@ class ProcessedDB:
("disabled", "INTEGER NOT NULL DEFAULT 0"), ("disabled", "INTEGER NOT NULL DEFAULT 0"),
("orig_start_time", "REAL"), ("orig_start_time", "REAL"),
("orig_end_time", "REAL"), ("orig_end_time", "REAL"),
("scan_timestamp", "TEXT NOT NULL DEFAULT ''"),
]: ]:
if col not in sr_cols: if col not in sr_cols:
self._con.execute( self._con.execute(
@@ -480,44 +482,99 @@ class ProcessedDB:
# ── Scan results ───────────────────────────────────────────── # ── Scan results ─────────────────────────────────────────────
def save_scan_results(self, filename: str, profile: str, model: str, def save_scan_results(self, filename: str, profile: str, model: str,
regions: list[tuple[float, float, float]]) -> None: regions: list[tuple[float, float, float]],
"""Replace scan results for (filename, profile, model) with new regions. max_versions: int = 5) -> None:
"""Save scan results as a new version for (filename, profile, model).
regions: list of (start_time, end_time, score). regions: list of (start_time, end_time, score).
Keeps up to max_versions; oldest are pruned automatically.
""" """
if not self._enabled: if not self._enabled:
return return
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
with self._lock: with self._lock:
self._con.execute(
"DELETE FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?",
(filename, profile, model),
)
self._con.executemany( self._con.executemany(
"INSERT INTO scan_results" "INSERT INTO scan_results"
" (filename, profile, model, start_time, end_time, score," " (filename, profile, model, start_time, end_time, score,"
" orig_start_time, orig_end_time)" " orig_start_time, orig_end_time, scan_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?)", " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
[(filename, profile, model, s, e, sc, s, e) for s, e, sc in regions], [(filename, profile, model, s, e, sc, s, e, ts)
for s, e, sc in regions],
)
# Prune old versions beyond max_versions
versions = self._con.execute(
"SELECT DISTINCT scan_timestamp FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
" ORDER BY scan_timestamp DESC",
(filename, profile, model),
).fetchall()
if len(versions) > max_versions:
old_ts = [v[0] for v in versions[max_versions:]]
self._con.execute(
"DELETE FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
f" AND scan_timestamp IN ({','.join('?' * len(old_ts))})",
(filename, profile, model, *old_ts),
) )
self._con.commit() self._con.commit()
def get_scan_results(self, filename: str, profile: str def get_scan_versions(self, filename: str, profile: str, model: str
) -> list[dict]:
"""Return list of scan versions for (filename, profile, model).
Returns [{timestamp, count, max_score}, ...] ordered newest first.
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT scan_timestamp, COUNT(*), MAX(score)"
" FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
" AND scan_timestamp != ''"
" GROUP BY scan_timestamp"
" ORDER BY scan_timestamp DESC",
(filename, profile, model),
).fetchall()
return [{"timestamp": ts, "count": cnt, "max_score": sc}
for ts, cnt, sc in rows]
def get_scan_results(self, filename: str, profile: str,
scan_timestamp: str | None = None
) -> dict[str, list[tuple[int, float, float, float, bool, float, float]]]: ) -> dict[str, list[tuple[int, float, float, float, bool, float, float]]]:
"""Return scan results grouped by model. """Return scan results grouped by model.
If scan_timestamp is given, returns only that version's rows.
Otherwise returns the latest version per model.
Returns {model: [(row_id, start, end, score, disabled, orig_start, orig_end), ...]} Returns {model: [(row_id, start, end, score, disabled, orig_start, orig_end), ...]}
sorted by start_time. sorted by start_time.
""" """
if not self._enabled: if not self._enabled:
return {} return {}
if scan_timestamp:
rows = self._con.execute( rows = self._con.execute(
"SELECT id, model, start_time, end_time, score, disabled," "SELECT id, model, start_time, end_time, score, disabled,"
" orig_start_time, orig_end_time" " orig_start_time, orig_end_time"
" FROM scan_results" " FROM scan_results"
" WHERE filename = ? AND profile = ?" " WHERE filename = ? AND profile = ? AND scan_timestamp = ?"
" ORDER BY model, start_time", " ORDER BY model, start_time",
(filename, profile), (filename, profile, scan_timestamp),
).fetchall()
else:
# For each model, get rows from the latest timestamp only
rows = self._con.execute(
"SELECT r.id, r.model, r.start_time, r.end_time, r.score,"
" r.disabled, r.orig_start_time, r.orig_end_time"
" FROM scan_results r"
" INNER JOIN ("
" SELECT model, MAX(scan_timestamp) AS latest"
" FROM scan_results"
" WHERE filename = ? AND profile = ?"
" GROUP BY model"
" ) m ON r.model = m.model AND r.scan_timestamp = m.latest"
" WHERE r.filename = ? AND r.profile = ?"
" ORDER BY r.model, r.start_time",
(filename, profile, filename, profile),
).fetchall() ).fetchall()
result: dict[str, list[tuple[int, float, float, float, bool, float, float]]] = {} result: dict[str, list[tuple[int, float, float, float, bool, float, float]]] = {}
for row_id, model, s, e, sc, dis, os_, oe in rows: for row_id, model, s, e, sc, dis, os_, oe in rows:
+27
View File
@@ -1,5 +1,6 @@
import os import os
import tempfile import tempfile
import time
from core.db import ProcessedDB from core.db import ProcessedDB
@@ -23,3 +24,29 @@ def test_export_folders_excludes_scan_exports():
assert "mp4_ScanOnly" in folders_all assert "mp4_ScanOnly" in folders_all
finally: finally:
os.unlink(path) os.unlink(path)
def test_scan_result_history():
"""save_scan_results should keep multiple versions."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
db = ProcessedDB(path)
# Save three versions with small delays so timestamps differ
db.save_scan_results("v.mp4", "test", "MODEL_A", [(0, 8, 0.9)])
time.sleep(1.1)
db.save_scan_results("v.mp4", "test", "MODEL_A",
[(0, 8, 0.8), (10, 18, 0.7)])
time.sleep(1.1)
db.save_scan_results("v.mp4", "test", "MODEL_A", [(5, 13, 0.95)])
versions = db.get_scan_versions("v.mp4", "test", "MODEL_A")
assert len(versions) == 3
# Most recent first
assert versions[0]["count"] == 1 # latest: 1 region
assert versions[1]["count"] == 2 # middle: 2 regions
assert versions[2]["count"] == 1 # oldest: 1 region
# get_scan_results returns latest version by default
results = db.get_scan_results("v.mp4", "test")
assert len(results.get("MODEL_A", [])) == 1
finally:
os.unlink(path)