From fbbfa6fdcee418071ef7e07c9f25e90b746dbb57 Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Thu, 16 Apr 2026 13:43:44 +0200 Subject: [PATCH] refactor: import shared logic from core/ instead of inline definitions Co-Authored-By: Claude Opus 4.6 --- main.py | 602 +------------------------------------------------------- 1 file changed, 8 insertions(+), 594 deletions(-) diff --git a/main.py b/main.py index 5441115..ed9fb33 100755 --- a/main.py +++ b/main.py @@ -4,15 +4,10 @@ locale.setlocale(locale.LC_NUMERIC, "C") # required by libmpv before any import import sys import os -import re -import json import random import shutil -import sqlite3 import subprocess -import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime, timezone from pathlib import Path from PyQt6.QtWidgets import ( @@ -32,599 +27,18 @@ elif sys.platform == "darwin" and getattr(sys, "frozen", False): os.environ.setdefault("DYLD_LIBRARY_PATH", str(Path(sys._MEIPASS))) import mpv - -def _frozen_path() -> Path: - """Return the directory containing bundled binaries in a PyInstaller build.""" - if getattr(sys, "frozen", False): - return Path(sys._MEIPASS) - return Path(__file__).parent - - -def _bin(name: str) -> str: - """Resolve a binary name (e.g. 'ffmpeg') to its full path in frozen builds.""" - p = _frozen_path() / name - if p.exists(): - return str(p) - return name # fall back to PATH - - -def _log(*args) -> None: - """Print a timestamped log line to stderr.""" - ts = datetime.now().strftime("%H:%M:%S") - print(f"[8-cut {ts}]", *args, file=sys.stderr) - - -def build_export_path(folder: str, basename: str, counter: int, sub: int | None = None) -> str: - group = f"{basename}_{counter:03d}" - name = f"{group}_{sub}" if sub is not None else group - return os.path.join(folder, group, name + ".mp4") - - -def build_sequence_dir(folder: str, basename: str, counter: int, sub: int | None = None) -> str: - group = f"{basename}_{counter:03d}" - name = f"{group}_{sub}" if sub is not None else group - return os.path.join(folder, group, name) - - -def format_time(seconds: float) -> str: - m = int(seconds // 60) - # Floor-truncate to 1 dp (not round) — prevents "X:60.0" rollover when - # seconds is e.g. 59.95. This means display may lag true position by up to 0.1s. - s = int(seconds % 60 * 10) / 10 - return f"{m}:{s:04.1f}" - - -def resolve_keyframe( - keyframes: list[tuple[float, float, str | None, bool, bool]], - t: float, - tolerance: float = 0.05, -) -> tuple[float, float, str | None, bool, bool] | None: - """Return the latest keyframe at or before *t*, or None.""" - result = None - for kf in keyframes: - if kf[0] <= t + tolerance: - result = kf - else: - break - return result - - -def apply_keyframes_to_jobs( - jobs: list[tuple[float, str, str | None, float]], - keyframes: list[tuple[float, float, str | None, bool, bool]], - base_center: float, - base_ratio: str | None, - base_rand_p: bool, - base_rand_s: bool, -) -> list[tuple[float, str, str | None, float, bool, bool]]: - """Resolve each job's crop state from keyframes, returning widened tuples. - - Returns list of (start, path, ratio, center, rand_portrait, rand_square). - """ - result = [] - for s, o, _r, _c in jobs: - kf = resolve_keyframe(keyframes, s) - if kf is not None: - _, center, ratio, rp, rs = kf - else: - center, ratio, rp, rs = base_center, base_ratio, base_rand_p, base_rand_s - result.append((s, o, ratio, center, rp, rs)) - return result - - -def build_ffmpeg_command( - input_path: str, start: float, output_path: str, - short_side: int | None = None, - portrait_ratio: str | None = None, - crop_center: float = 0.5, - image_sequence: bool = False, - encoder: str = "libx264", -) -> list[str]: - # -ss before -i: fast input-seeking. Safe here because we always re-encode, - # so there is no keyframe-alignment issue from pre-input seek. - # Image sequences always use libwebp, so skip HW encoder setup. - use_hw_vaapi = encoder == "h264_vaapi" and not image_sequence - cmd = [_bin("ffmpeg"), "-y"] - - # VAAPI needs a device for hardware context. - if use_hw_vaapi: - cmd += ["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi", - "-vaapi_device", "/dev/dri/renderD128"] - - cmd += [ - "-threads", "0", - "-ss", str(start), - "-i", input_path, - "-t", "8", - ] - - filters: list[str] = [] - if portrait_ratio is not None: - filters.append(_portrait_crop_filter(portrait_ratio, crop_center)) - if short_side is not None: - # Scale so the shorter dimension equals short_side. - # if(lt(iw,ih),...) → portrait output: fix width; landscape: fix height. - # -2 keeps aspect ratio with even-pixel rounding (encoder requirement). - filters.append( - f"scale='if(lt(iw,ih),{short_side},-2)':'if(lt(iw,ih),-2,{short_side})':flags=lanczos" - ) - - # VAAPI: decoded frames are GPU surfaces. CPU filters (crop/scale) need - # hwdownload first, then re-upload for the HW encoder. - if use_hw_vaapi: - if filters: - filters.insert(0, "hwdownload") - filters.insert(1, "format=nv12") - filters.append("format=nv12") - filters.append("hwupload") - - if filters: - cmd += ["-vf", ",".join(filters)] - - if image_sequence: - cmd += [ - "-an", - "-c:v", "libwebp", - "-quality", "92", - "-compression_level", "1", - os.path.join(output_path, "frame_%04d.webp"), - ] - else: - cmd += ["-c:v", encoder, "-c:a", "pcm_s16le", output_path] - return cmd - - -def build_audio_extract_command(input_path: str, start: float, sequence_dir: str) -> list[str]: - """Return an ffmpeg command that extracts audio to .wav.""" - audio_path = sequence_dir + ".wav" - return [ - _bin("ffmpeg"), "-y", - "-ss", str(start), - "-i", input_path, - "-t", "8", - "-vn", - "-c:a", "pcm_s16le", - audio_path, - ] - - -def build_annotation_json_path(folder: str) -> str: - return os.path.join(folder, "dataset.json") - - -def remove_clip_annotation(folder: str, clip_path: str) -> None: - """Remove the entry for *clip_path* from /dataset.json if present.""" - json_path = build_annotation_json_path(folder) - if not os.path.exists(json_path): - return - abs_path = os.path.abspath(clip_path) - with open(json_path, "r", encoding="utf-8") as f: - try: - entries = json.load(f) - except (json.JSONDecodeError, ValueError): - return - entries = [e for e in entries if e.get("path") != abs_path] - with open(json_path, "w", encoding="utf-8") as f: - json.dump(entries, f, indent=2, ensure_ascii=False) - f.write("\n") - - -def upsert_clip_annotation(folder: str, clip_path: str, label: str) -> None: - """Insert or update one entry in /dataset.json. - - Each entry stores a path relative to *folder* and the sound label. - Matches on ``path``; if an entry for the same clip already exists it is - replaced (overwrite-export case). Nothing is written when *label* is - empty. - """ - if not label.strip(): - return - os.makedirs(folder, exist_ok=True) - json_path = build_annotation_json_path(folder) - entries: list[dict] = [] - if os.path.exists(json_path): - with open(json_path, "r", encoding="utf-8") as f: - try: - entries = json.load(f) - except (json.JSONDecodeError, ValueError): - entries = [] - abs_path = os.path.abspath(clip_path) - entry: dict = {"path": abs_path, "label": label} - for i, e in enumerate(entries): - if e.get("path") == abs_path: - entries[i] = entry - break - else: - entries.append(entry) - with open(json_path, "w", encoding="utf-8") as f: - json.dump(entries, f, indent=2, ensure_ascii=False) - f.write("\n") - - -def detect_hw_encoders() -> list[str]: - """Probe ffmpeg for available H.264 hardware encoders. - - Returns a list like ["h264_nvenc", "h264_vaapi", ...]. - Only includes encoders that ffmpeg reports as available. - """ - _HW_ENCODERS = ["h264_nvenc", "h264_vaapi", "h264_qsv", "h264_amf", "h264_videotoolbox"] - try: - result = subprocess.run( - [_bin("ffmpeg"), "-hide_banner", "-encoders"], - capture_output=True, text=True, timeout=5, - ) - if result.returncode != 0: - return [] - output = result.stdout - except Exception: - return [] - available = [] - for enc in _HW_ENCODERS: - if re.search(rf'\b{enc}\b', output): - available.append(enc) - if available: - _log(f"HW encoders detected: {', '.join(available)}") - else: - _log("No HW encoders detected — GPU export unavailable") - return available - - -_RATIOS: dict[str, tuple[int, int]] = { - "9:16": (9, 16), - "4:5": (4, 5), - "1:1": (1, 1), -} - -def _portrait_crop_filter(ratio: str, crop_center: float) -> str: - """Return an ffmpeg crop= filter expression for the given portrait ratio. - - Uses ffmpeg expression syntax so source dimensions are resolved at runtime. - Commas inside min()/max() are escaped with \\, to prevent ffmpeg's - filtergraph parser from treating them as filter-chain separators. - """ - num, den = _RATIOS[ratio] - cw = f"ih*{num}/{den}" - x = f"max(0\\,min((iw-{cw})*{crop_center}\\,iw-{cw}))" - return f"crop={cw}:ih:{x}:0" - +from core.paths import _bin, _log, build_export_path, build_sequence_dir, format_time +from core.ffmpeg import ( + _RATIOS, resolve_keyframe, apply_keyframes_to_jobs, + build_ffmpeg_command, build_audio_extract_command, detect_hw_encoders, +) +from core.db import ProcessedDB +from core.annotations import remove_clip_annotation, upsert_clip_annotation +from core.tracking import track_centers_for_jobs _SELVA_CATEGORIES = ["", "Human", "Animal", "Vehicle", "Tool", "Music", "Nature", "Sport", "Other"] -# --------------------------------------------------------------------------- -# Subject tracking (YOLO-based, optional) -# --------------------------------------------------------------------------- - -_yolo_model = None - - -def _get_yolo(): - """Lazy-load YOLOv8-nano. Returns None if ultralytics is not installed.""" - global _yolo_model - if _yolo_model is None: - try: - from ultralytics import YOLO - _yolo_model = YOLO("yolov8n.pt") - _log("YOLO model loaded") - except ImportError: - _log("ultralytics not installed — tracking disabled") - return None - except Exception as e: - _log(f"YOLO load failed: {e}") - return None - return _yolo_model - - -def extract_frame_cv(video_path: str, time: float): - """Extract a single frame as a numpy array (BGR) via ffmpeg → temp PNG → cv2.""" - try: - import cv2 - import numpy as np - except ImportError: - return None - fd, tmp = tempfile.mkstemp(suffix=".png") - os.close(fd) - try: - cmd = [_bin("ffmpeg"), "-y", "-ss", str(time), "-i", video_path, - "-frames:v", "1", tmp] - result = subprocess.run(cmd, capture_output=True, timeout=10) - if result.returncode != 0: - return None - return cv2.imread(tmp) - except Exception: - return None - finally: - if os.path.exists(tmp): - os.unlink(tmp) - - -def detect_subject_center( - video_path: str, time: float, target_cls: int | None, last_x: float, last_y: float, -) -> tuple[int | None, float, float] | None: - """Detect objects at *time* and return (class_id, norm_x, norm_y) of the - best match to (target_cls, last_x, last_y). Returns None on failure.""" - model = _get_yolo() - if model is None: - return None - frame = extract_frame_cv(video_path, time) - if frame is None: - return None - results = model(frame, verbose=False) - if not results or len(results[0].boxes) == 0: - return None - h, w = frame.shape[:2] - dets = [] - for box in results[0].boxes: - x1, y1, x2, y2 = box.xyxy[0].tolist() - cls = int(box.cls[0]) - cx = (x1 + x2) / 2 / w - cy = (y1 + y2) / 2 / h - dets.append((cls, cx, cy)) - # Prefer same class, nearest to last known position. - def score(d): - cls_penalty = 0 if (target_cls is None or d[0] == target_cls) else 1.0 - dist = (d[1] - last_x) ** 2 + (d[2] - last_y) ** 2 - return cls_penalty + dist - best = min(dets, key=score) - return best - - -def track_centers_for_jobs( - video_path: str, cursor: float, crop_center: float, - starts: list[float], -) -> list[float]: - """Run detection at the cursor (to identify the target) then at each start - time. Returns a list of horizontal crop centers (one per start).""" - ref = detect_subject_center(video_path, cursor, None, crop_center, 0.5) - if ref is None: - _log("Tracking: no detection at cursor, using fixed center") - return [crop_center] * len(starts) - target_cls, last_x, last_y = ref - _log(f"Tracking: target class={target_cls} at ({last_x:.2f}, {last_y:.2f})") - centers = [] - for t in starts: - det = detect_subject_center(video_path, t, target_cls, last_x, last_y) - if det is not None: - _, cx, cy = det - _log(f" t={t:.2f}s → center={cx:.3f}") - centers.append(cx) - last_x, last_y = cx, cy - else: - _log(f" t={t:.2f}s → lost, reusing {last_x:.3f}") - centers.append(last_x) - return centers - - -class ProcessedDB: - _SCHEMA_VERSION = 3 # bump when schema changes - - def __init__(self, db_path: str | None = None): - if db_path is None: - db_path = str(Path.home() / ".8cut.db") - self._path = db_path - try: - self._con = sqlite3.connect(db_path, check_same_thread=False) - self._migrate() - self._enabled = True - _log(f"DB opened: {db_path}") - except Exception as e: - _log(f"DB unavailable: {e}") - self._con = None - self._enabled = False - - def _migrate(self) -> None: - """Create table if missing, then add any new columns for old DBs.""" - cols = { - row[1] - for row in self._con.execute("PRAGMA table_info(processed)").fetchall() - } - if not cols: - # Fresh DB — create from scratch - self._con.execute( - "CREATE TABLE IF NOT EXISTS processed (" - " id INTEGER PRIMARY KEY AUTOINCREMENT," - " filename TEXT NOT NULL," - " start_time REAL NOT NULL," - " output_path TEXT NOT NULL," - " label TEXT NOT NULL DEFAULT ''," - " category TEXT NOT NULL DEFAULT ''," - " short_side INTEGER DEFAULT 512," - " portrait_ratio TEXT NOT NULL DEFAULT ''," - " crop_center REAL NOT NULL DEFAULT 0.5," - " format TEXT NOT NULL DEFAULT 'MP4'," - " clip_count INTEGER NOT NULL DEFAULT 3," - " spread REAL NOT NULL DEFAULT 3.0," - " profile TEXT NOT NULL DEFAULT 'default'," - " processed_at TEXT NOT NULL" - ")" - ) - else: - # Add missing columns to legacy tables - new_cols = { - "label": "TEXT NOT NULL DEFAULT ''", - "category": "TEXT NOT NULL DEFAULT ''", - "short_side": "INTEGER DEFAULT 512", - "portrait_ratio": "TEXT NOT NULL DEFAULT ''", - "crop_center": "REAL NOT NULL DEFAULT 0.5", - "format": "TEXT NOT NULL DEFAULT 'MP4'", - "clip_count": "INTEGER NOT NULL DEFAULT 3", - "spread": "REAL NOT NULL DEFAULT 3.0", - "profile": "TEXT NOT NULL DEFAULT 'default'", - } - for col, typedef in new_cols.items(): - if col not in cols: - self._con.execute( - f"ALTER TABLE processed ADD COLUMN {col} {typedef}" - ) - self._con.execute( - "CREATE INDEX IF NOT EXISTS idx_filename ON processed(filename)" - ) - self._con.execute( - "CREATE TABLE IF NOT EXISTS hidden_files (" - " filename TEXT NOT NULL," - " profile TEXT NOT NULL DEFAULT 'default'," - " PRIMARY KEY (filename, profile)" - ")" - ) - self._con.commit() - - def add(self, filename: str, start_time: float, output_path: str, - label: str = "", category: str = "", - short_side: int | None = None, portrait_ratio: str = "", - crop_center: float = 0.5, fmt: str = "MP4", - clip_count: int = 3, spread: float = 3.0, - profile: str = "default") -> None: - if not self._enabled: - return - self._con.execute( - "INSERT INTO processed" - " (filename, start_time, output_path, label, category," - " short_side, portrait_ratio, crop_center, format," - " clip_count, spread, profile, processed_at)" - " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - (filename, start_time, output_path, label, category, - short_side, portrait_ratio, crop_center, fmt, - clip_count, spread, profile, - datetime.now(timezone.utc).isoformat()), - ) - self._con.commit() - - def get_labels(self) -> list[str]: - """Return distinct non-empty labels ordered by most recently used.""" - if not self._enabled: - return [] - rows = self._con.execute( - "SELECT DISTINCT label FROM processed" - " WHERE label != '' ORDER BY processed_at DESC" - ).fetchall() - # Deduplicate while preserving order (DISTINCT on processed_at DESC - # may return duplicates if the same label was used multiple times). - seen: set[str] = set() - result = [] - for (lbl,) in rows: - if lbl not in seen: - seen.add(lbl) - result.append(lbl) - return result - - def get_by_output_path(self, output_path: str) -> dict | None: - """Return config dict for an output_path, or None.""" - if not self._enabled: - return None - self._con.row_factory = sqlite3.Row - row = self._con.execute( - "SELECT label, category, short_side, portrait_ratio, crop_center, format," - " clip_count, spread" - " FROM processed WHERE output_path = ?", - (output_path,), - ).fetchone() - self._con.row_factory = None - return dict(row) if row else None - - def delete_by_output_path(self, output_path: str) -> None: - if not self._enabled: - return - self._con.execute("DELETE FROM processed WHERE output_path = ?", (output_path,)) - self._con.commit() - - def get_group(self, output_path: str) -> list[str]: - """Return all output_paths sharing the same (filename, start_time) as *output_path*.""" - if not self._enabled: - return [] - row = self._con.execute( - "SELECT filename, start_time FROM processed WHERE output_path = ?", - (output_path,), - ).fetchone() - if not row: - return [] - rows = self._con.execute( - "SELECT output_path FROM processed" - " WHERE filename = ? AND start_time = ? ORDER BY output_path", - (row[0], row[1]), - ).fetchall() - return [r[0] for r in rows] - - def delete_group(self, output_path: str) -> list[str]: - """Delete all rows sharing the same (filename, start_time) as *output_path*. - Returns list of deleted output_paths.""" - if not self._enabled: - return [] - row = self._con.execute( - "SELECT filename, start_time FROM processed WHERE output_path = ?", - (output_path,), - ).fetchone() - if not row: - return [] - filename, start_time = row - paths = [r[0] for r in self._con.execute( - "SELECT output_path FROM processed WHERE filename = ? AND start_time = ?", - (filename, start_time), - ).fetchall()] - self._con.execute( - "DELETE FROM processed WHERE filename = ? AND start_time = ?", - (filename, start_time), - ) - self._con.commit() - return paths - - def _get_markers_for(self, match: str, profile: str = "default") -> list[tuple[float, int, str]]: - rows = self._con.execute( - "SELECT start_time, output_path FROM processed" - " WHERE filename = ? AND profile = ? ORDER BY start_time", - (match, profile), - ).fetchall() - # Deduplicate by start_time — batch exports share the same cursor. - seen_times: dict[float, tuple[float, int, str]] = {} - n = 0 - for t, p in rows: - if t not in seen_times: - n += 1 - seen_times[t] = (t, n, p) - return list(seen_times.values()) - - def get_markers(self, filename: str, profile: str = "default") -> list[tuple[float, int, str]]: - """Return [(start_time, marker_number, output_path), ...] for exact - filename match, sorted by start_time. Empty list if no match.""" - if not self._enabled: - return [] - return self._get_markers_for(filename, profile) - - def get_profiles(self) -> list[str]: - """Return distinct profile names, ordered alphabetically.""" - if not self._enabled: - return [] - rows = self._con.execute( - "SELECT DISTINCT profile FROM processed ORDER BY profile" - ).fetchall() - return [r[0] for r in rows] - - def hide_file(self, filename: str, profile: str = "default") -> None: - if not self._enabled: - return - self._con.execute( - "INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)", - (filename, profile), - ) - self._con.commit() - - def unhide_file(self, filename: str, profile: str = "default") -> None: - if not self._enabled: - return - self._con.execute( - "DELETE FROM hidden_files WHERE filename = ? AND profile = ?", - (filename, profile), - ) - self._con.commit() - - def get_hidden_files(self, profile: str = "default") -> set[str]: - if not self._enabled: - return set() - rows = self._con.execute( - "SELECT filename FROM hidden_files WHERE profile = ?", (profile,) - ).fetchall() - return {r[0] for r in rows} - - class _DBWorker(QThread): """Runs ProcessedDB fuzzy-match lookup off the main thread.""" result = pyqtSignal(str, object, list) # (queried_filename, match|None, markers)