diff --git a/core/db.py b/core/db.py index 60fbbab..f05e3a5 100644 --- a/core/db.py +++ b/core/db.py @@ -94,7 +94,8 @@ class ProcessedDB: " score REAL NOT NULL," " disabled INTEGER NOT NULL DEFAULT 0," " orig_start_time REAL," - " orig_end_time REAL" + " orig_end_time REAL," + " scan_timestamp TEXT NOT NULL DEFAULT ''" ")" ) # Migrate: add new columns to existing scan_results tables @@ -106,6 +107,7 @@ class ProcessedDB: ("disabled", "INTEGER NOT NULL DEFAULT 0"), ("orig_start_time", "REAL"), ("orig_end_time", "REAL"), + ("scan_timestamp", "TEXT NOT NULL DEFAULT ''"), ]: if col not in sr_cols: self._con.execute( @@ -480,45 +482,100 @@ class ProcessedDB: # ── Scan results ───────────────────────────────────────────── def save_scan_results(self, filename: str, profile: str, model: str, - regions: list[tuple[float, float, float]]) -> None: - """Replace scan results for (filename, profile, model) with new regions. + regions: list[tuple[float, float, float]], + max_versions: int = 5) -> None: + """Save scan results as a new version for (filename, profile, model). regions: list of (start_time, end_time, score). + Keeps up to max_versions; oldest are pruned automatically. """ if not self._enabled: return + ts = datetime.now().strftime("%Y%m%d_%H%M%S") with self._lock: - self._con.execute( - "DELETE FROM scan_results" - " WHERE filename = ? AND profile = ? AND model = ?", - (filename, profile, model), - ) self._con.executemany( "INSERT INTO scan_results" " (filename, profile, model, start_time, end_time, score," - " orig_start_time, orig_end_time)" - " VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - [(filename, profile, model, s, e, sc, s, e) for s, e, sc in regions], + " orig_start_time, orig_end_time, scan_timestamp)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + [(filename, profile, model, s, e, sc, s, e, ts) + for s, e, sc in regions], ) + # Prune old versions beyond max_versions + versions = self._con.execute( + "SELECT DISTINCT scan_timestamp FROM scan_results" + " WHERE filename = ? AND profile = ? AND model = ?" + " ORDER BY scan_timestamp DESC", + (filename, profile, model), + ).fetchall() + if len(versions) > max_versions: + old_ts = [v[0] for v in versions[max_versions:]] + self._con.execute( + "DELETE FROM scan_results" + " WHERE filename = ? AND profile = ? AND model = ?" + f" AND scan_timestamp IN ({','.join('?' * len(old_ts))})", + (filename, profile, model, *old_ts), + ) self._con.commit() - def get_scan_results(self, filename: str, profile: str + def get_scan_versions(self, filename: str, profile: str, model: str + ) -> list[dict]: + """Return list of scan versions for (filename, profile, model). + + Returns [{timestamp, count, max_score}, ...] ordered newest first. + """ + if not self._enabled: + return [] + rows = self._con.execute( + "SELECT scan_timestamp, COUNT(*), MAX(score)" + " FROM scan_results" + " WHERE filename = ? AND profile = ? AND model = ?" + " AND scan_timestamp != ''" + " GROUP BY scan_timestamp" + " ORDER BY scan_timestamp DESC", + (filename, profile, model), + ).fetchall() + return [{"timestamp": ts, "count": cnt, "max_score": sc} + for ts, cnt, sc in rows] + + def get_scan_results(self, filename: str, profile: str, + scan_timestamp: str | None = None ) -> dict[str, list[tuple[int, float, float, float, bool, float, float]]]: """Return scan results grouped by model. + If scan_timestamp is given, returns only that version's rows. + Otherwise returns the latest version per model. + Returns {model: [(row_id, start, end, score, disabled, orig_start, orig_end), ...]} sorted by start_time. """ if not self._enabled: return {} - rows = self._con.execute( - "SELECT id, model, start_time, end_time, score, disabled," - " orig_start_time, orig_end_time" - " FROM scan_results" - " WHERE filename = ? AND profile = ?" - " ORDER BY model, start_time", - (filename, profile), - ).fetchall() + if scan_timestamp: + rows = self._con.execute( + "SELECT id, model, start_time, end_time, score, disabled," + " orig_start_time, orig_end_time" + " FROM scan_results" + " WHERE filename = ? AND profile = ? AND scan_timestamp = ?" + " ORDER BY model, start_time", + (filename, profile, scan_timestamp), + ).fetchall() + else: + # For each model, get rows from the latest timestamp only + rows = self._con.execute( + "SELECT r.id, r.model, r.start_time, r.end_time, r.score," + " r.disabled, r.orig_start_time, r.orig_end_time" + " FROM scan_results r" + " INNER JOIN (" + " SELECT model, MAX(scan_timestamp) AS latest" + " FROM scan_results" + " WHERE filename = ? AND profile = ?" + " GROUP BY model" + " ) m ON r.model = m.model AND r.scan_timestamp = m.latest" + " WHERE r.filename = ? AND r.profile = ?" + " ORDER BY r.model, r.start_time", + (filename, profile, filename, profile), + ).fetchall() result: dict[str, list[tuple[int, float, float, float, bool, float, float]]] = {} for row_id, model, s, e, sc, dis, os_, oe in rows: # Fall back to current bounds for legacy rows without orig diff --git a/tests/test_db.py b/tests/test_db.py index 1d3a9b8..3ab3d41 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1,5 +1,6 @@ import os import tempfile +import time from core.db import ProcessedDB @@ -23,3 +24,29 @@ def test_export_folders_excludes_scan_exports(): assert "mp4_ScanOnly" in folders_all finally: os.unlink(path) + + +def test_scan_result_history(): + """save_scan_results should keep multiple versions.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + path = f.name + try: + db = ProcessedDB(path) + # Save three versions with small delays so timestamps differ + db.save_scan_results("v.mp4", "test", "MODEL_A", [(0, 8, 0.9)]) + time.sleep(1.1) + db.save_scan_results("v.mp4", "test", "MODEL_A", + [(0, 8, 0.8), (10, 18, 0.7)]) + time.sleep(1.1) + db.save_scan_results("v.mp4", "test", "MODEL_A", [(5, 13, 0.95)]) + versions = db.get_scan_versions("v.mp4", "test", "MODEL_A") + assert len(versions) == 3 + # Most recent first + assert versions[0]["count"] == 1 # latest: 1 region + assert versions[1]["count"] == 2 # middle: 2 regions + assert versions[2]["count"] == 1 # oldest: 1 region + # get_scan_results returns latest version by default + results = db.get_scan_results("v.mp4", "test") + assert len(results.get("MODEL_A", [])) == 1 + finally: + os.unlink(path)