Files
8-cut/core/db.py
T
Ethanfel 35c67f4bd5 perf: single-pass get_training_stats (was O(folders × rows))
Group clips by export folder in one scan instead of re-scanning every row for
each folder; also drops the extra get_export_folders() query. Speeds up the
train-dialog stats with many subcategories.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-07 19:52:13 +02:00

1400 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
from .paths import _log
def _extract_m_number(output_path: str) -> int | None:
"""Extract the manual export number from a path like clip_001_m3_0.mp4."""
m = re.search(r'_m(\d+)[_.]', os.path.basename(output_path))
return int(m.group(1)) if m else None
class ProcessedDB:
_SCHEMA_VERSION = 4 # bump when schema changes
def __init__(self, db_path: str | None = None):
if db_path is None:
db_path = str(Path.home() / ".8cut.db")
self._path = db_path
self._lock = threading.Lock()
try:
self._con = sqlite3.connect(db_path, check_same_thread=False)
# Performance pragmas: WAL cuts lock contention and fsync cost,
# a bigger page cache keeps hot scans in memory.
for pragma in (
"PRAGMA journal_mode = WAL",
"PRAGMA synchronous = NORMAL",
"PRAGMA temp_store = MEMORY",
"PRAGMA cache_size = -65536", # ~64 MB
):
try:
self._con.execute(pragma)
except sqlite3.Error:
pass
self._migrate()
self._enabled = True
_log(f"DB opened: {db_path}")
except Exception as e:
_log(f"DB unavailable: {e}")
self._con = None
self._enabled = False
def _migrate(self) -> None:
"""Create table if missing, then add any new columns for old DBs."""
cols = {
row[1]
for row in self._con.execute("PRAGMA table_info(processed)").fetchall()
}
if not cols:
# Fresh DB — create from scratch
self._con.execute(
"CREATE TABLE IF NOT EXISTS processed ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" filename TEXT NOT NULL,"
" start_time REAL NOT NULL,"
" output_path TEXT NOT NULL,"
" label TEXT NOT NULL DEFAULT '',"
" category TEXT NOT NULL DEFAULT '',"
" short_side INTEGER DEFAULT 512,"
" portrait_ratio TEXT NOT NULL DEFAULT '',"
" crop_center REAL NOT NULL DEFAULT 0.5,"
" format TEXT NOT NULL DEFAULT 'MP4',"
" clip_count INTEGER NOT NULL DEFAULT 3,"
" clip_duration REAL NOT NULL DEFAULT 8.0,"
" spread REAL NOT NULL DEFAULT 3.0,"
" profile TEXT NOT NULL DEFAULT 'default',"
" source_path TEXT NOT NULL DEFAULT '',"
" scan_export INTEGER NOT NULL DEFAULT 0,"
" processed_at TEXT NOT NULL"
")"
)
else:
# Add missing columns to legacy tables
new_cols = {
"label": "TEXT NOT NULL DEFAULT ''",
"category": "TEXT NOT NULL DEFAULT ''",
"short_side": "INTEGER DEFAULT 512",
"portrait_ratio": "TEXT NOT NULL DEFAULT ''",
"crop_center": "REAL NOT NULL DEFAULT 0.5",
"format": "TEXT NOT NULL DEFAULT 'MP4'",
"clip_count": "INTEGER NOT NULL DEFAULT 3",
"clip_duration": "REAL NOT NULL DEFAULT 8.0",
"spread": "REAL NOT NULL DEFAULT 3.0",
"profile": "TEXT NOT NULL DEFAULT 'default'",
"source_path": "TEXT NOT NULL DEFAULT ''",
"scan_export": "INTEGER NOT NULL DEFAULT 0",
}
for col, typedef in new_cols.items():
if col not in cols:
self._con.execute(
f"ALTER TABLE processed ADD COLUMN {col} {typedef}"
)
self._con.execute(
"CREATE INDEX IF NOT EXISTS idx_filename ON processed(filename)"
)
# Most hot queries filter by profile, often with filename too.
self._con.execute(
"CREATE INDEX IF NOT EXISTS idx_profile_filename"
" ON processed(profile, filename)"
)
self._con.execute(
"CREATE TABLE IF NOT EXISTS hidden_files ("
" filename TEXT NOT NULL,"
" profile TEXT NOT NULL DEFAULT 'default',"
" PRIMARY KEY (filename, profile)"
")"
)
self._con.execute(
"CREATE TABLE IF NOT EXISTS scan_results ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" filename TEXT NOT NULL,"
" profile TEXT NOT NULL DEFAULT 'default',"
" model TEXT NOT NULL,"
" start_time REAL NOT NULL,"
" end_time REAL NOT NULL,"
" score REAL NOT NULL,"
" disabled INTEGER NOT NULL DEFAULT 0,"
" orig_start_time REAL,"
" orig_end_time REAL,"
" scan_timestamp TEXT NOT NULL DEFAULT ''"
")"
)
# Migrate: add new columns to existing scan_results tables
sr_cols = {
row[1]
for row in self._con.execute("PRAGMA table_info(scan_results)").fetchall()
}
for col, typedef in [
("disabled", "INTEGER NOT NULL DEFAULT 0"),
("orig_start_time", "REAL"),
("orig_end_time", "REAL"),
("scan_timestamp", "TEXT NOT NULL DEFAULT ''"),
]:
if col not in sr_cols:
self._con.execute(
f"ALTER TABLE scan_results ADD COLUMN {col} {typedef}"
)
self._con.execute(
"CREATE INDEX IF NOT EXISTS idx_scan_file_profile_model"
" ON scan_results(filename, profile, model)"
)
self._con.execute(
"CREATE TABLE IF NOT EXISTS hard_negatives ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" filename TEXT NOT NULL,"
" profile TEXT NOT NULL DEFAULT 'default',"
" start_time REAL NOT NULL,"
" source_path TEXT NOT NULL DEFAULT '',"
" source_model TEXT NOT NULL DEFAULT ''"
")"
)
# Migrate: add source_model column to existing hard_negatives tables
hn_cols = {
row[1]
for row in self._con.execute("PRAGMA table_info(hard_negatives)").fetchall()
}
if "source_model" not in hn_cols:
self._con.execute(
"ALTER TABLE hard_negatives ADD COLUMN source_model TEXT NOT NULL DEFAULT ''"
)
self._con.execute(
"CREATE INDEX IF NOT EXISTS idx_hardneg_file_profile"
" ON hard_negatives(filename, profile)"
)
self._con.commit()
self._migrate_vid_folders()
def _migrate_vid_folders(self) -> None:
"""Migrate old clip_NNN group dirs → vid_NNN per-video folders.
Old layout: export_folder/clip_NNN/clip_NNN_sub.mp4
New layout: export_folder/vid_NNN/clip_NNN_sub.mp4
Rewrites output_path in DB and moves files on disk.
"""
# Check if any rows still use the old clip_NNN parent dir layout
row = self._con.execute(
"SELECT id FROM processed WHERE output_path LIKE '%/clip_%/%' LIMIT 1"
).fetchone()
if not row:
return
_log("Migrating old clip group dirs → vid folders …")
rows = self._con.execute(
"SELECT id, filename, profile, output_path FROM processed"
" ORDER BY profile, filename, output_path"
).fetchall()
# Assign vid_NNN per (profile, export_folder, filename)
vid_map: dict[tuple, str] = {}
vid_counters: dict[tuple, int] = {}
for rid, filename, profile, op in rows:
parent = os.path.dirname(op)
export_folder = os.path.dirname(parent)
key = (profile, export_folder, filename)
if key not in vid_map:
counter_key = (profile, export_folder)
n = vid_counters.get(counter_key, 1)
vid_map[key] = f"vid_{n:03d}"
vid_counters[counter_key] = n + 1
updates: list[tuple[str, int]] = []
moves: list[tuple[str, str]] = []
dirs_to_create: set[str] = set()
old_dirs: set[str] = set()
for rid, filename, profile, op in rows:
parent = os.path.dirname(op)
parent_name = os.path.basename(parent)
# Skip rows already using vid_NNN layout
if parent_name.startswith("vid_"):
continue
export_folder = os.path.dirname(parent)
key = (profile, export_folder, filename)
vid_name = vid_map[key]
new_path = os.path.join(export_folder, vid_name, os.path.basename(op))
updates.append((new_path, rid))
dirs_to_create.add(os.path.join(export_folder, vid_name))
old_dirs.add(parent)
if os.path.exists(op):
moves.append((op, new_path))
if not updates:
return
# Create vid directories
for d in sorted(dirs_to_create):
os.makedirs(d, exist_ok=True)
# Move files
import shutil
for old, new in moves:
if os.path.exists(old) and not os.path.exists(new):
shutil.move(old, new)
# Update DB
self._con.executemany(
"UPDATE processed SET output_path = ? WHERE id = ?", updates
)
self._con.commit()
# Remove empty old group directories
for d in sorted(old_dirs, reverse=True):
try:
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
except OSError:
pass
_log(f"Migrated {len(updates)} rows, moved {len(moves)} files to vid folders")
def add(self, filename: str, start_time: float, output_path: str,
label: str = "", category: str = "",
short_side: int | None = None, portrait_ratio: str = "",
crop_center: float = 0.5, fmt: str = "MP4",
clip_count: int = 3, clip_duration: float = 8.0,
spread: float = 3.0,
profile: str = "default", source_path: str = "",
scan_export: bool = False) -> None:
if not self._enabled:
return
with self._lock:
self._con.execute(
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, clip_duration, spread, profile, source_path,"
" scan_export, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, start_time, output_path, label, category,
short_side, portrait_ratio, crop_center, fmt,
clip_count, clip_duration, spread, profile, source_path,
1 if scan_export else 0,
datetime.now(timezone.utc).isoformat()),
)
self._con.commit()
def update_source_paths(self, new_dir: str,
playlist_paths: list[str] | None = None,
profile: str = "") -> int:
"""Re-resolve source_path for all rows whose current path is missing.
Checks *new_dir* and *playlist_paths* by filename match.
Returns the number of rows updated.
"""
if not self._enabled:
return 0
lookup: dict[str, str] = {}
if playlist_paths:
for p in playlist_paths:
lookup[os.path.basename(p)] = p
if new_dir and os.path.isdir(new_dir):
for f in os.listdir(new_dir):
fp = os.path.join(new_dir, f)
if os.path.isfile(fp):
lookup[f] = fp
if not lookup:
return 0
query = "SELECT DISTINCT filename, source_path FROM processed"
params: tuple = ()
if profile:
query += " WHERE profile = ?"
params = (profile,)
rows = self._con.execute(query, params).fetchall()
updated = 0
with self._lock:
for fn, sp in rows:
if sp and os.path.exists(sp):
continue
new_path = lookup.get(fn)
if new_path and os.path.isfile(new_path):
self._con.execute(
"UPDATE processed SET source_path = ? WHERE filename = ?",
(new_path, fn),
)
updated += 1
if updated:
self._con.commit()
return updated
def get_labels(self) -> list[str]:
"""Return distinct non-empty labels ordered by most recently used."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT label FROM processed"
" WHERE label != '' ORDER BY processed_at DESC"
).fetchall()
# Deduplicate while preserving order (DISTINCT on processed_at DESC
# may return duplicates if the same label was used multiple times).
seen: set[str] = set()
result = []
for (lbl,) in rows:
if lbl not in seen:
seen.add(lbl)
result.append(lbl)
return result
def get_by_output_path(self, output_path: str) -> dict | None:
"""Return config dict for an output_path, or None."""
if not self._enabled:
return None
cur = self._con.cursor()
cur.row_factory = sqlite3.Row
row = cur.execute(
"SELECT label, category, short_side, portrait_ratio, crop_center, format,"
" clip_count, clip_duration, spread"
" FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
return dict(row) if row else None
def delete_by_output_path(self, output_path: str, profile: str = "") -> None:
if not self._enabled:
return
with self._lock:
if profile:
self._con.execute(
"DELETE FROM processed WHERE output_path = ? AND profile = ?",
(output_path, profile),
)
else:
self._con.execute(
"DELETE FROM processed WHERE output_path = ?", (output_path,),
)
self._con.commit()
def is_path_used_by_other_profiles(self, output_path: str, profile: str) -> bool:
"""Return True if *output_path* is referenced by any profile other than *profile*."""
if not self._enabled:
return False
row = self._con.execute(
"SELECT 1 FROM processed WHERE output_path = ? AND profile != ? LIMIT 1",
(output_path, profile),
).fetchone()
return row is not None
def get_group(self, output_path: str, profile: str = "") -> list[str]:
"""Return all output_paths sharing the same (filename, start_time, profile) as *output_path*."""
if not self._enabled:
return []
row = self._con.execute(
"SELECT filename, start_time, profile FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
if not row:
return []
filename, start_time, row_profile = row
p = profile or row_profile
rows = self._con.execute(
"SELECT output_path FROM processed"
" WHERE filename = ? AND start_time = ? AND profile = ? ORDER BY output_path",
(filename, start_time, p),
).fetchall()
return [r[0] for r in rows]
def delete_group(self, output_path: str, profile: str = "") -> list[str]:
"""Delete all rows sharing the same (filename, start_time, profile) as *output_path*.
Returns list of deleted output_paths."""
if not self._enabled:
return []
with self._lock:
row = self._con.execute(
"SELECT filename, start_time, profile FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
if not row:
return []
filename, start_time, row_profile = row
p = profile or row_profile
paths = [r[0] for r in self._con.execute(
"SELECT output_path FROM processed"
" WHERE filename = ? AND start_time = ? AND profile = ?",
(filename, start_time, p),
).fetchall()]
self._con.execute(
"DELETE FROM processed WHERE filename = ? AND start_time = ? AND profile = ?",
(filename, start_time, p),
)
self._con.commit()
return paths
def _get_markers_for(self, match: str, profile: str = "default",
export_folder: str = "") -> list[tuple[float, int, str, float]]:
if export_folder:
rows = self._con.execute(
"SELECT start_time, output_path, clip_duration, clip_count, spread"
" FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 0"
" AND output_path LIKE ?"
" ORDER BY start_time",
(match, profile, export_folder.rstrip("/") + "/%"),
).fetchall()
else:
rows = self._con.execute(
"SELECT start_time, output_path, clip_duration, clip_count, spread"
" FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 0"
" ORDER BY start_time",
(match, profile),
).fetchall()
seen_times: dict[float, tuple[float, int, str, float]] = {}
seq = 0
for t, p, dur, cnt, spr in rows:
if t not in seen_times:
seq += 1
num = _extract_m_number(p) or seq
span = (dur or 8.0) + ((cnt or 1) - 1) * (spr or 3.0)
seen_times[t] = (t, num, p, span)
return list(seen_times.values())
def get_markers(self, filename: str, profile: str = "default",
export_folder: str = "") -> list[tuple[float, int, str, float]]:
"""Return [(start_time, marker_number, output_path, clip_span), ...]
for exact filename match, sorted by start_time. Empty list if no match.
Excludes scan exports (shown via scan panel instead).
If export_folder is set, only markers in that folder are returned."""
if not self._enabled:
return []
return self._get_markers_for(filename, profile, export_folder)
def get_other_folder_markers(self, filename: str, profile: str = "default",
export_folder: str = ""
) -> dict[str, list[tuple[float, int, str, float]]]:
"""Return {folder_name: [(start_time, num, path, span), ...]} for
markers NOT in export_folder, grouped by their base export folder."""
if not self._enabled or not export_folder:
return {}
rows = self._con.execute(
"SELECT start_time, output_path, clip_duration, clip_count, spread"
" FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 0"
" AND output_path NOT LIKE ?"
" ORDER BY start_time",
(filename, profile, export_folder.rstrip("/") + "/%"),
).fetchall()
by_folder: dict[str, list] = {}
for t, p, dur, cnt, spr in rows:
parts = p.split("/")
for i, part in enumerate(parts):
if part.startswith("vid_"):
folder = "/".join(parts[:i])
break
else:
folder = os.path.dirname(os.path.dirname(p))
by_folder.setdefault(folder, []).append((t, p, dur, cnt, spr))
result: dict[str, list[tuple[float, int, str, float]]] = {}
for folder, folder_rows in by_folder.items():
seen: dict[float, tuple[float, int, str, float]] = {}
seq = 0
for t, p, dur, cnt, spr in folder_rows:
if t not in seen:
seq += 1
num = _extract_m_number(p) or seq
span = (dur or 8.0) + ((cnt or 1) - 1) * (spr or 3.0)
seen[t] = (t, num, p, span)
name = os.path.basename(folder)
if name.endswith("_disabled"):
continue # disabled clips are excluded from the timeline
result[name] = list(seen.values())
return result
def get_manual_export_groups(self, filename: str, profile: str = "default"
) -> list[dict]:
"""Return manual (non-scan) export groups for *filename*.
Each group dict has:
start_time, paths (list[str] sorted), clip_count, clip_duration,
spread, short_side, portrait_ratio, crop_center, format, label,
category
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT start_time, output_path, clip_count, clip_duration, spread,"
" short_side, portrait_ratio, crop_center, format, label, category"
" FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 0"
" ORDER BY start_time, output_path",
(filename, profile),
).fetchall()
groups: dict[float, dict] = {}
for r in rows:
t = r[0]
if t not in groups:
groups[t] = {
"start_time": t,
"paths": [],
"clip_count": r[2], "clip_duration": r[3],
"spread": r[4],
"short_side": r[5], "portrait_ratio": r[6],
"crop_center": r[7], "format": r[8],
"label": r[9], "category": r[10],
}
groups[t]["paths"].append(r[1])
return list(groups.values())
def get_clip_count(self, filename: str, profile: str = "default") -> int:
"""Return total number of exported clips (including scan exports)."""
if not self._enabled:
return 0
row = self._con.execute(
"SELECT COUNT(*) FROM processed WHERE filename = ? AND profile = ?",
(filename, profile),
).fetchone()
return row[0] if row else 0
def get_clip_counts_by_folder(self, filename: str,
profile: str = "default") -> dict[str, int]:
"""Return per-export-folder clip counts for a single video.
Folder name is the grandparent dir of each clip's output_path
(e.g. ``mp4_doggy_clap``).
"""
if not self._enabled:
return {}
rows = self._con.execute(
"SELECT output_path FROM processed WHERE filename = ? AND profile = ?",
(filename, profile),
).fetchall()
counts: dict[str, int] = {}
for (op,) in rows:
folder = os.path.basename(os.path.dirname(os.path.dirname(op)))
counts[folder] = counts.get(folder, 0) + 1
return counts
def get_clip_counts_grouped(self, profile: str = "default"
) -> dict[str, dict[str, int]]:
"""Return ``{filename: {export_folder: count}}`` for a whole profile
in a single scan (replaces N per-file queries on the hot path)."""
if not self._enabled:
return {}
rows = self._con.execute(
"SELECT filename, output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
out: dict[str, dict[str, int]] = {}
for fn, op in rows:
folder = os.path.basename(os.path.dirname(os.path.dirname(op)))
d = out.get(fn)
if d is None:
d = out[fn] = {}
d[folder] = d.get(folder, 0) + 1
return out
def get_all_folder_counts(self, profile: str = "default") -> dict[str, int]:
"""Return clip counts per export folder across all videos in *profile*.
Includes ``_disabled`` folders so callers can offer enable/disable.
"""
if not self._enabled:
return {}
rows = self._con.execute(
"SELECT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
counts: dict[str, int] = {}
for (op,) in rows:
folder = os.path.basename(os.path.dirname(os.path.dirname(op)))
counts[folder] = counts.get(folder, 0) + 1
return counts
def relocate_video_clips(self, filename: "str | None", profile: str,
src_folder_name: str,
dst_folder_name: str) -> int:
"""Move clips from one export folder to a sibling folder.
Matches rows whose grandparent dir basename == *src_folder_name*
(restricted to *filename* when given, else every video in *profile*),
then moves each clip (and any ``.wav`` sidecar) on disk into a sibling
folder named *dst_folder_name*, migrates its dataset.json annotation,
and rewrites output_path in the DB. Returns the number of clips moved.
"""
if not self._enabled:
return 0
import shutil
from .annotations import remove_clip_annotation, upsert_clip_annotation
if filename is None:
rows = self._con.execute(
"SELECT id, output_path, label FROM processed WHERE profile = ?",
(profile,),
).fetchall()
else:
rows = self._con.execute(
"SELECT id, output_path, label FROM processed"
" WHERE filename = ? AND profile = ?",
(filename, profile),
).fetchall()
moves: list[tuple[str, str]] = [] # (old_path, new_path)
updates: list[tuple[str, int]] = [] # (new_path, id)
ann: list[tuple[str, str, str, str, str]] = [] # old_fold,new_fold,old,new,label
new_dirs: set[str] = set()
old_vid_dirs: set[str] = set()
for rid, op, label in rows:
vid_dir = os.path.dirname(op)
export_folder = os.path.dirname(vid_dir)
if os.path.basename(export_folder) != src_folder_name:
continue
new_export_folder = os.path.join(
os.path.dirname(export_folder), dst_folder_name)
new_vid_dir = os.path.join(new_export_folder, os.path.basename(vid_dir))
new_op = os.path.join(new_vid_dir, os.path.basename(op))
updates.append((new_op, rid))
new_dirs.add(new_vid_dir)
old_vid_dirs.add(vid_dir)
if os.path.exists(op):
moves.append((op, new_op))
ann.append((export_folder, new_export_folder, op, new_op, label or ""))
if not updates:
return 0
with self._lock:
for d in sorted(new_dirs):
os.makedirs(d, exist_ok=True)
for old, new in moves:
if os.path.exists(old) and not os.path.exists(new):
shutil.move(old, new)
wav_old, wav_new = old + ".wav", new + ".wav"
if os.path.exists(wav_old) and not os.path.exists(wav_new):
shutil.move(wav_old, wav_new)
self._con.executemany(
"UPDATE processed SET output_path = ? WHERE id = ?", updates)
self._con.commit()
# Migrate dataset.json entries (best-effort, outside the DB lock).
for old_fold, new_fold, old_op, new_op, label in ann:
remove_clip_annotation(old_fold, old_op)
if label:
upsert_clip_annotation(new_fold, new_op, label)
# Remove now-empty old vid dirs and their export folder if empty.
for d in sorted(old_vid_dirs):
try:
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
parent = os.path.dirname(d)
if os.path.isdir(parent) and not os.listdir(parent):
os.rmdir(parent)
except OSError:
pass
_log(f"Relocated {len(updates)} clip(s) of {filename or 'all videos'}: "
f"{src_folder_name} -> {dst_folder_name}")
return len(updates)
def get_profiles(self) -> list[str]:
"""Return distinct profile names across all tables, ordered alphabetically."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT profile FROM processed"
" UNION SELECT DISTINCT profile FROM scan_results"
" UNION SELECT DISTINCT profile FROM hard_negatives"
" ORDER BY profile"
).fetchall()
return [r[0] for r in rows]
def duplicate_profile(self, src: str, dst: str) -> int:
"""Copy all profile data from *src* to *dst*.
Copies processed (exports), scan_results, hard_negatives, and
hidden_files. Returns total number of rows copied.
"""
if not self._enabled or src == dst:
return 0
total = 0
with self._lock:
# processed (exports)
rows = self._con.execute(
"SELECT filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, clip_duration, spread, source_path, scan_export,"
" processed_at"
" FROM processed WHERE profile = ?", (src,),
).fetchall()
for r in rows:
self._con.execute(
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, clip_duration, spread, profile,"
" source_path, scan_export, processed_at)"
" VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(*r[:12], dst, *r[12:]),
)
total += len(rows)
# scan_results
rows = self._con.execute(
"SELECT filename, model, start_time, end_time, score,"
" disabled, orig_start_time, orig_end_time, scan_timestamp"
" FROM scan_results WHERE profile = ?", (src,),
).fetchall()
for r in rows:
self._con.execute(
"INSERT INTO scan_results"
" (filename, profile, model, start_time, end_time, score,"
" disabled, orig_start_time, orig_end_time, scan_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(r[0], dst, r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]),
)
total += len(rows)
# hard_negatives
rows = self._con.execute(
"SELECT filename, start_time, source_path, source_model"
" FROM hard_negatives WHERE profile = ?", (src,),
).fetchall()
for r in rows:
self._con.execute(
"INSERT INTO hard_negatives"
" (filename, profile, start_time, source_path, source_model)"
" VALUES (?, ?, ?, ?, ?)",
(r[0], dst, r[1], r[2], r[3]),
)
total += len(rows)
# hidden_files
rows = self._con.execute(
"SELECT filename FROM hidden_files WHERE profile = ?", (src,),
).fetchall()
for r in rows:
self._con.execute(
"INSERT OR IGNORE INTO hidden_files (filename, profile)"
" VALUES (?, ?)",
(r[0], dst),
)
total += len(rows)
self._con.commit()
return total
def count_profile_rows(self, profile: str) -> int:
"""Return total number of rows across all tables for *profile*."""
if not self._enabled:
return 0
n = 0
for table in ("processed", "scan_results", "hard_negatives", "hidden_files"):
row = self._con.execute(
f"SELECT COUNT(*) FROM {table} WHERE profile = ?", (profile,),
).fetchone()
n += row[0] if row else 0
return n
def delete_profile(self, profile: str) -> None:
"""Delete all rows for *profile* from every table."""
if not self._enabled:
return
with self._lock:
for table in ("processed", "scan_results", "hard_negatives", "hidden_files"):
self._con.execute(
f"DELETE FROM {table} WHERE profile = ?", (profile,),
)
self._con.commit()
def get_all_export_paths(self, profile: str = "default") -> list[str]:
"""Return all unique output_path values for a given profile."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
return [r[0] for r in rows]
def get_max_counter(self, folder: str, name: str) -> int:
"""Return the highest counter N found in output_paths matching folder/name_NNN*.
Parses the counter from filenames (e.g. 'clip_035_0.mp4' → 35).
*folder* is typically the vid folder. Returns 0 if no matches exist.
"""
if not self._enabled:
return 0
prefix = os.path.join(folder, name + "_")
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed"
" WHERE output_path LIKE ?",
(prefix + "%",),
).fetchall()
max_n = 0
name_prefix = name + "_"
for (op,) in rows:
stem = os.path.splitext(os.path.basename(op))[0]
# stem: "clip_035_0" or "clip_036_a1_0"
if not stem.startswith(name_prefix):
continue
rest = stem[len(name_prefix):] # "035_0" or "036_a1_0"
counter_str = rest.split("_")[0]
try:
max_n = max(max_n, int(counter_str))
except ValueError:
pass
return max_n
def get_scan_export_rep_paths_in_range(self, filename: str, profile: str,
start: float, end: float) -> list[str]:
"""Return one representative output_path per distinct scan-export
start_time inside [start, end] for (filename, profile)."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT output_path FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 1"
" AND start_time BETWEEN ? AND ?"
" GROUP BY start_time",
(filename, profile, start, end),
).fetchall()
return [r[0] for r in rows]
def get_scan_export_times(self, filename: str, profile: str) -> list[float]:
"""Return start_times of scan_export=1 rows for this file/profile."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT start_time FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 1",
(filename, profile),
).fetchall()
return [r[0] for r in rows]
def delete_scan_exports(self, filename: str, profile: str) -> int:
"""Delete all scan_export entries for *filename* in *profile*.
Returns the number of rows deleted.
"""
if not self._enabled:
return 0
cur = self._con.execute(
"DELETE FROM processed"
" WHERE filename = ? AND profile = ? AND scan_export = 1",
(filename, profile),
)
self._con.commit()
return cur.rowcount
def get_vid_folder(self, filename: str, profile: str,
export_folder: str) -> str:
"""Return the vid_NNN folder name for a source video.
Checks existing DB output_paths first; if the video already has a
vid_NNN folder, returns it. Otherwise assigns max(existing) + 1,
also checking disk for orphan vid folders.
"""
if not self._enabled:
return "vid_001"
# Use the most recent entry (ORDER BY rowid DESC) for determinism
# when a file has entries across multiple vid folders.
row = self._con.execute(
"SELECT output_path FROM processed"
" WHERE filename = ? AND profile = ?"
" ORDER BY rowid DESC LIMIT 1",
(filename, profile),
).fetchone()
if row:
parent = os.path.basename(os.path.dirname(row[0]))
if parent.startswith("vid_"):
return parent
# Collect max vid_NNN number from DB + disk (never reuse old numbers)
max_n = 0
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
for (op,) in rows:
p = os.path.basename(os.path.dirname(op))
if p.startswith("vid_"):
try:
max_n = max(max_n, int(p.split("_")[1]))
except (IndexError, ValueError):
pass
if os.path.isdir(export_folder):
for d in os.listdir(export_folder):
if d.startswith("vid_") and os.path.isdir(
os.path.join(export_folder, d)
):
try:
max_n = max(max_n, int(d.split("_")[1]))
except (IndexError, ValueError):
pass
return f"vid_{max_n + 1:03d}"
def get_export_folders(self, profile: str = "default",
include_scan_exports: bool = False) -> list[str]:
"""Return distinct export folder names found in output_paths for a profile.
Export paths follow the structure:
.../export_folder/vid_NNN/clip.mp4
The export folder is 2 levels up from the clip file.
Returns folder names sorted alphabetically (e.g. ["mp4_Intense", "mp4_Soft"]).
"""
if not self._enabled:
return []
if include_scan_exports:
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
else:
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed"
" WHERE profile = ? AND scan_export = 0",
(profile,),
).fetchall()
folder_names: set[str] = set()
for (op,) in rows:
grandparent = os.path.basename(os.path.dirname(os.path.dirname(op)))
if grandparent and not grandparent.endswith("_disabled"):
folder_names.add(grandparent)
return sorted(folder_names)
def get_training_data(self, profile: str,
positive_folder: "str | list[str]",
negative_folder: str = "",
fallback_video_dir: str = "",
playlist_paths: list[str] | None = None,
include_scan_exports: bool = False,
use_hard_negatives: bool = True,
) -> list[tuple[str, list[float], list[float], list[float]]]:
"""Build training video_infos from DB data.
Args:
profile: profile name
positive_folder: export folder name(s) for positive class
negative_folder: export folder name for explicit negatives (optional)
fallback_video_dir: if source_path is empty, try filename in this dir
playlist_paths: loaded playlist paths to resolve filenames
include_scan_exports: if True, include auto-exported scan clips
use_hard_negatives: if False, skip hard negatives from scan feedback
Returns:
list of (source_video_path, positive_times, soft_times, negative_times)
per video. Soft times = clips from any other non-positive/non-negative folder.
"""
if not self._enabled:
return []
pos_folders = {positive_folder} if isinstance(positive_folder, str) else set(positive_folder)
if include_scan_exports:
rows = self._con.execute(
"SELECT filename, start_time, output_path, source_path"
" FROM processed WHERE profile = ?",
(profile,),
).fetchall()
else:
rows = self._con.execute(
"SELECT filename, start_time, output_path, source_path"
" FROM processed WHERE profile = ? AND scan_export = 0",
(profile,),
).fetchall()
# Collect times by video, split by folder role
pos_by_video: dict[str, set[float]] = {}
neg_by_video: dict[str, set[float]] = {}
soft_by_video: dict[str, set[float]] = {}
source_by_filename: dict[str, str] = {}
for fn, st, op, sp in rows:
if sp:
source_by_filename[fn] = sp
grandparent = os.path.basename(os.path.dirname(os.path.dirname(op)))
if grandparent.endswith("_disabled"):
continue # disabled clips are excluded from training entirely
if grandparent in pos_folders:
pos_by_video.setdefault(fn, set()).add(st)
elif negative_folder and grandparent == negative_folder:
neg_by_video.setdefault(fn, set()).add(st)
else:
soft_by_video.setdefault(fn, set()).add(st)
# Include hard negatives from scan feedback
if use_hard_negatives:
hard_rows = self._con.execute(
"SELECT filename, start_time, source_path FROM hard_negatives"
" WHERE profile = ?",
(profile,),
).fetchall()
for fn, st, sp in hard_rows:
neg_by_video.setdefault(fn, set()).add(st)
if sp:
source_by_filename.setdefault(fn, sp)
# Remove positive times from soft/neg to avoid conflicting labels
for fn in pos_by_video:
if fn in soft_by_video:
soft_by_video[fn] -= pos_by_video[fn]
if fn in neg_by_video:
neg_by_video[fn] -= pos_by_video[fn]
# Deduplicate nearby markers (spread clips from same position)
def _dedup_times(times: set[float], min_gap: float = 8.0) -> list[float]:
if not times:
return []
ordered = sorted(times)
result = [ordered[0]]
for t in ordered[1:]:
if t - result[-1] >= min_gap:
result.append(t)
return result
# Build filename→path lookup from playlist
playlist_lookup: dict[str, str] = {}
if playlist_paths:
for p in playlist_paths:
playlist_lookup[os.path.basename(p)] = p
# Include videos that have positives OR explicit negatives
all_videos = set(pos_by_video) | set(neg_by_video)
result = []
for fn in all_videos:
sp = source_by_filename.get(fn, "")
if not sp or not os.path.exists(sp):
sp = playlist_lookup.get(fn, "")
if not sp or not os.path.exists(sp):
if fallback_video_dir:
sp = os.path.join(fallback_video_dir, fn)
if not sp or not os.path.exists(sp):
continue
gt_pos = _dedup_times(pos_by_video.get(fn, set()))
gt_soft = _dedup_times(soft_by_video.get(fn, set()))
gt_neg = _dedup_times(neg_by_video.get(fn, set()))
result.append((sp, gt_pos, gt_soft, gt_neg))
return result
def get_training_stats(self, profile: str,
include_scan_exports: bool = False) -> dict[str, dict]:
"""Return per-subprofile stats for training readiness display.
Returns dict mapping subprofile_name → {
'videos': number of distinct source videos,
'clips': total clip count,
}
"""
if not self._enabled:
return {}
if include_scan_exports:
rows = self._con.execute(
"SELECT filename, output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
else:
rows = self._con.execute(
"SELECT filename, output_path FROM processed"
" WHERE profile = ? AND scan_export = 0",
(profile,),
).fetchall()
# Single pass: group by export folder (grandparent dir), counting
# clips and distinct source videos. (Was O(folders × rows).)
videos: dict[str, set[str]] = {}
clips: dict[str, int] = {}
for fn, op in rows:
folder_name = os.path.basename(os.path.dirname(os.path.dirname(op)))
if not folder_name or folder_name.endswith("_disabled"):
continue
videos.setdefault(folder_name, set()).add(fn)
clips[folder_name] = clips.get(folder_name, 0) + 1
return {f: {"videos": len(videos[f]), "clips": n}
for f, n in clips.items() if n > 0}
# ── Scan results ─────────────────────────────────────────────
def save_scan_results(self, filename: str, profile: str, model: str,
regions: list[tuple[float, float, float]],
max_versions: int = 5) -> None:
"""Save scan results as a new version for (filename, profile, model).
regions: list of (start_time, end_time, score).
Keeps up to max_versions; oldest are pruned automatically.
"""
if not self._enabled:
return
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
with self._lock:
self._con.executemany(
"INSERT INTO scan_results"
" (filename, profile, model, start_time, end_time, score,"
" orig_start_time, orig_end_time, scan_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
[(filename, profile, model, s, e, sc, s, e, ts)
for s, e, sc in regions],
)
# Prune old versions beyond max_versions
versions = self._con.execute(
"SELECT DISTINCT scan_timestamp FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
" ORDER BY scan_timestamp DESC",
(filename, profile, model),
).fetchall()
if len(versions) > max_versions:
old_ts = [v[0] for v in versions[max_versions:]]
self._con.execute(
"DELETE FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
f" AND scan_timestamp IN ({','.join('?' * len(old_ts))})",
(filename, profile, model, *old_ts),
)
self._con.commit()
def get_scan_versions(self, filename: str, profile: str, model: str
) -> list[dict]:
"""Return list of scan versions for (filename, profile, model).
Returns [{timestamp, count, max_score}, ...] ordered newest first.
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT scan_timestamp, COUNT(*), MAX(score)"
" FROM scan_results"
" WHERE filename = ? AND profile = ? AND model = ?"
" AND scan_timestamp != ''"
" GROUP BY scan_timestamp"
" ORDER BY scan_timestamp DESC",
(filename, profile, model),
).fetchall()
return [{"timestamp": ts, "count": cnt, "max_score": sc}
for ts, cnt, sc in rows]
def get_scan_results(self, filename: str, profile: str,
scan_timestamp: str | None = None
) -> dict[str, list[tuple[int, float, float, float, bool, float, float]]]:
"""Return scan results grouped by model.
If scan_timestamp is given, returns only that version's rows.
Otherwise returns the latest version per model.
Returns {model: [(row_id, start, end, score, disabled, orig_start, orig_end), ...]}
sorted by start_time.
"""
if not self._enabled:
return {}
if scan_timestamp:
rows = self._con.execute(
"SELECT id, model, start_time, end_time, score, disabled,"
" orig_start_time, orig_end_time"
" FROM scan_results"
" WHERE filename = ? AND profile = ? AND scan_timestamp = ?"
" ORDER BY model, start_time",
(filename, profile, scan_timestamp),
).fetchall()
else:
# For each model, get rows from the latest timestamp only
rows = self._con.execute(
"SELECT r.id, r.model, r.start_time, r.end_time, r.score,"
" r.disabled, r.orig_start_time, r.orig_end_time"
" FROM scan_results r"
" INNER JOIN ("
" SELECT model, MAX(scan_timestamp) AS latest"
" FROM scan_results"
" WHERE filename = ? AND profile = ?"
" GROUP BY model"
" ) m ON r.model = m.model AND r.scan_timestamp = m.latest"
" WHERE r.filename = ? AND r.profile = ?"
" ORDER BY r.model, r.start_time",
(filename, profile, filename, profile),
).fetchall()
result: dict[str, list[tuple[int, float, float, float, bool, float, float]]] = {}
for row_id, model, s, e, sc, dis, os_, oe in rows:
# Fall back to current bounds for legacy rows without orig
result.setdefault(model, []).append(
(row_id, s, e, sc, bool(dis), os_ if os_ is not None else s,
oe if oe is not None else e))
return result
def delete_scan_result(self, row_id: int) -> None:
"""Delete a single scan result row."""
if not self._enabled:
return
with self._lock:
self._con.execute("DELETE FROM scan_results WHERE id = ?", (row_id,))
self._con.commit()
def toggle_scan_result_disabled(self, row_id: int, disabled: bool) -> None:
"""Set disabled flag on a scan result row."""
if not self._enabled:
return
with self._lock:
self._con.execute(
"UPDATE scan_results SET disabled = ? WHERE id = ?",
(1 if disabled else 0, row_id),
)
self._con.commit()
def update_scan_result_times(self, row_id: int,
start: float, end: float) -> None:
"""Update start/end times of a scan result row (resize)."""
if not self._enabled:
return
with self._lock:
self._con.execute(
"UPDATE scan_results SET start_time = ?, end_time = ? WHERE id = ?",
(start, end, row_id),
)
self._con.commit()
def insert_scan_result(self, filename: str, profile: str, model: str,
start: float, end: float, score: float,
disabled: bool, orig_start: float, orig_end: float,
scan_timestamp: str = "") -> int:
"""Insert a single scan result row; returns its new id."""
if not self._enabled:
return -1
with self._lock:
cur = self._con.execute(
"INSERT INTO scan_results"
" (filename, profile, model, start_time, end_time, score,"
" disabled, orig_start_time, orig_end_time, scan_timestamp)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, profile, model, start, end, score,
1 if disabled else 0, orig_start, orig_end, scan_timestamp),
)
self._con.commit()
return int(cur.lastrowid or -1)
def update_scan_result_full(self, row_id: int, start: float, end: float,
score: float, orig_start: float,
orig_end: float) -> None:
"""Update bounds, score and orig_* fields — used after merging rows."""
if not self._enabled:
return
with self._lock:
self._con.execute(
"UPDATE scan_results"
" SET start_time = ?, end_time = ?, score = ?,"
" orig_start_time = ?, orig_end_time = ?"
" WHERE id = ?",
(start, end, score, orig_start, orig_end, row_id),
)
self._con.commit()
def get_scan_models(self, filename: str, profile: str) -> list[str]:
"""Return model names that have scan results for this file."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT model FROM scan_results"
" WHERE filename = ? AND profile = ? ORDER BY model",
(filename, profile),
).fetchall()
return [r[0] for r in rows]
def get_scanned_filenames(self, profile: str, model: str) -> set[str]:
"""Return filenames that already have scan results for this model."""
if not self._enabled:
return set()
rows = self._con.execute(
"SELECT DISTINCT filename FROM scan_results"
" WHERE profile = ? AND model = ?",
(profile, model),
).fetchall()
return {r[0] for r in rows}
def add_hard_negatives(self, filename: str, profile: str,
times: list[float], source_path: str = "",
source_model: str = "") -> None:
"""Save timestamps as hard-negative training examples."""
if not self._enabled or not times:
return
with self._lock:
for t in times:
self._con.execute(
"INSERT INTO hard_negatives"
" (filename, profile, start_time, source_path, source_model)"
" VALUES (?, ?, ?, ?, ?)",
(filename, profile, t, source_path, source_model),
)
self._con.commit()
def get_hard_negative_times(self, filename: str, profile: str) -> set[float]:
"""Return start_times marked as hard negatives for this file."""
if not self._enabled:
return set()
rows = self._con.execute(
"SELECT start_time FROM hard_negatives"
" WHERE filename = ? AND profile = ?",
(filename, profile),
).fetchall()
return {r[0] for r in rows}
def get_hard_negatives(self, profile: str) -> list[dict]:
"""Return all hard negatives for a profile with full details."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT id, filename, start_time, source_path, source_model"
" FROM hard_negatives WHERE profile = ?"
" ORDER BY filename, start_time",
(profile,),
).fetchall()
return [{"id": r[0], "filename": r[1], "start_time": r[2],
"source_path": r[3], "source_model": r[4]} for r in rows]
def delete_hard_negatives_by_ids(self, ids: list[int]) -> None:
"""Delete hard negatives by row IDs."""
if not self._enabled or not ids:
return
with self._lock:
self._con.execute(
f"DELETE FROM hard_negatives WHERE id IN ({','.join('?' * len(ids))})",
ids,
)
self._con.commit()
def remove_hard_negatives(self, filename: str, profile: str,
times: list[float]) -> None:
"""Remove specific hard-negative timestamps."""
if not self._enabled or not times:
return
with self._lock:
for t in times:
self._con.execute(
"DELETE FROM hard_negatives"
" WHERE filename = ? AND profile = ? AND start_time = ?",
(filename, profile, t),
)
self._con.commit()
def get_training_filenames(self, profile: str) -> set[str]:
"""Return filenames used in training (have exported clips)."""
if not self._enabled:
return set()
rows = self._con.execute(
"SELECT DISTINCT filename FROM processed WHERE profile = ?",
(profile,),
).fetchall()
return {r[0] for r in rows}
# ── Hidden files ───────────────────────────────────────────
def hide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return
with self._lock:
self._con.execute(
"INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)",
(filename, profile),
)
self._con.commit()
def unhide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return
with self._lock:
self._con.execute(
"DELETE FROM hidden_files WHERE filename = ? AND profile = ?",
(filename, profile),
)
self._con.commit()
def get_hidden_files(self, profile: str = "default") -> set[str]:
if not self._enabled:
return set()
rows = self._con.execute(
"SELECT filename FROM hidden_files WHERE profile = ?", (profile,)
).fetchall()
return {r[0] for r in rows}