refactor: import shared logic from core/ instead of inline definitions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 13:43:44 +02:00
parent 56920a5247
commit fbbfa6fdce
+8 -594
View File
@@ -4,15 +4,10 @@ locale.setlocale(locale.LC_NUMERIC, "C") # required by libmpv before any import
import sys import sys
import os import os
import re
import json
import random import random
import shutil import shutil
import sqlite3
import subprocess import subprocess
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
@@ -32,599 +27,18 @@ elif sys.platform == "darwin" and getattr(sys, "frozen", False):
os.environ.setdefault("DYLD_LIBRARY_PATH", str(Path(sys._MEIPASS))) os.environ.setdefault("DYLD_LIBRARY_PATH", str(Path(sys._MEIPASS)))
import mpv import mpv
from core.paths import _bin, _log, build_export_path, build_sequence_dir, format_time
def _frozen_path() -> Path: from core.ffmpeg import (
"""Return the directory containing bundled binaries in a PyInstaller build.""" _RATIOS, resolve_keyframe, apply_keyframes_to_jobs,
if getattr(sys, "frozen", False): build_ffmpeg_command, build_audio_extract_command, detect_hw_encoders,
return Path(sys._MEIPASS) )
return Path(__file__).parent from core.db import ProcessedDB
from core.annotations import remove_clip_annotation, upsert_clip_annotation
from core.tracking import track_centers_for_jobs
def _bin(name: str) -> str:
"""Resolve a binary name (e.g. 'ffmpeg') to its full path in frozen builds."""
p = _frozen_path() / name
if p.exists():
return str(p)
return name # fall back to PATH
def _log(*args) -> None:
"""Print a timestamped log line to stderr."""
ts = datetime.now().strftime("%H:%M:%S")
print(f"[8-cut {ts}]", *args, file=sys.stderr)
def build_export_path(folder: str, basename: str, counter: int, sub: int | None = None) -> str:
group = f"{basename}_{counter:03d}"
name = f"{group}_{sub}" if sub is not None else group
return os.path.join(folder, group, name + ".mp4")
def build_sequence_dir(folder: str, basename: str, counter: int, sub: int | None = None) -> str:
group = f"{basename}_{counter:03d}"
name = f"{group}_{sub}" if sub is not None else group
return os.path.join(folder, group, name)
def format_time(seconds: float) -> str:
m = int(seconds // 60)
# Floor-truncate to 1 dp (not round) — prevents "X:60.0" rollover when
# seconds is e.g. 59.95. This means display may lag true position by up to 0.1s.
s = int(seconds % 60 * 10) / 10
return f"{m}:{s:04.1f}"
def resolve_keyframe(
keyframes: list[tuple[float, float, str | None, bool, bool]],
t: float,
tolerance: float = 0.05,
) -> tuple[float, float, str | None, bool, bool] | None:
"""Return the latest keyframe at or before *t*, or None."""
result = None
for kf in keyframes:
if kf[0] <= t + tolerance:
result = kf
else:
break
return result
def apply_keyframes_to_jobs(
jobs: list[tuple[float, str, str | None, float]],
keyframes: list[tuple[float, float, str | None, bool, bool]],
base_center: float,
base_ratio: str | None,
base_rand_p: bool,
base_rand_s: bool,
) -> list[tuple[float, str, str | None, float, bool, bool]]:
"""Resolve each job's crop state from keyframes, returning widened tuples.
Returns list of (start, path, ratio, center, rand_portrait, rand_square).
"""
result = []
for s, o, _r, _c in jobs:
kf = resolve_keyframe(keyframes, s)
if kf is not None:
_, center, ratio, rp, rs = kf
else:
center, ratio, rp, rs = base_center, base_ratio, base_rand_p, base_rand_s
result.append((s, o, ratio, center, rp, rs))
return result
def build_ffmpeg_command(
input_path: str, start: float, output_path: str,
short_side: int | None = None,
portrait_ratio: str | None = None,
crop_center: float = 0.5,
image_sequence: bool = False,
encoder: str = "libx264",
) -> list[str]:
# -ss before -i: fast input-seeking. Safe here because we always re-encode,
# so there is no keyframe-alignment issue from pre-input seek.
# Image sequences always use libwebp, so skip HW encoder setup.
use_hw_vaapi = encoder == "h264_vaapi" and not image_sequence
cmd = [_bin("ffmpeg"), "-y"]
# VAAPI needs a device for hardware context.
if use_hw_vaapi:
cmd += ["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi",
"-vaapi_device", "/dev/dri/renderD128"]
cmd += [
"-threads", "0",
"-ss", str(start),
"-i", input_path,
"-t", "8",
]
filters: list[str] = []
if portrait_ratio is not None:
filters.append(_portrait_crop_filter(portrait_ratio, crop_center))
if short_side is not None:
# Scale so the shorter dimension equals short_side.
# if(lt(iw,ih),...) → portrait output: fix width; landscape: fix height.
# -2 keeps aspect ratio with even-pixel rounding (encoder requirement).
filters.append(
f"scale='if(lt(iw,ih),{short_side},-2)':'if(lt(iw,ih),-2,{short_side})':flags=lanczos"
)
# VAAPI: decoded frames are GPU surfaces. CPU filters (crop/scale) need
# hwdownload first, then re-upload for the HW encoder.
if use_hw_vaapi:
if filters:
filters.insert(0, "hwdownload")
filters.insert(1, "format=nv12")
filters.append("format=nv12")
filters.append("hwupload")
if filters:
cmd += ["-vf", ",".join(filters)]
if image_sequence:
cmd += [
"-an",
"-c:v", "libwebp",
"-quality", "92",
"-compression_level", "1",
os.path.join(output_path, "frame_%04d.webp"),
]
else:
cmd += ["-c:v", encoder, "-c:a", "pcm_s16le", output_path]
return cmd
def build_audio_extract_command(input_path: str, start: float, sequence_dir: str) -> list[str]:
"""Return an ffmpeg command that extracts audio to <sequence_dir>.wav."""
audio_path = sequence_dir + ".wav"
return [
_bin("ffmpeg"), "-y",
"-ss", str(start),
"-i", input_path,
"-t", "8",
"-vn",
"-c:a", "pcm_s16le",
audio_path,
]
def build_annotation_json_path(folder: str) -> str:
return os.path.join(folder, "dataset.json")
def remove_clip_annotation(folder: str, clip_path: str) -> None:
"""Remove the entry for *clip_path* from <folder>/dataset.json if present."""
json_path = build_annotation_json_path(folder)
if not os.path.exists(json_path):
return
abs_path = os.path.abspath(clip_path)
with open(json_path, "r", encoding="utf-8") as f:
try:
entries = json.load(f)
except (json.JSONDecodeError, ValueError):
return
entries = [e for e in entries if e.get("path") != abs_path]
with open(json_path, "w", encoding="utf-8") as f:
json.dump(entries, f, indent=2, ensure_ascii=False)
f.write("\n")
def upsert_clip_annotation(folder: str, clip_path: str, label: str) -> None:
"""Insert or update one entry in <folder>/dataset.json.
Each entry stores a path relative to *folder* and the sound label.
Matches on ``path``; if an entry for the same clip already exists it is
replaced (overwrite-export case). Nothing is written when *label* is
empty.
"""
if not label.strip():
return
os.makedirs(folder, exist_ok=True)
json_path = build_annotation_json_path(folder)
entries: list[dict] = []
if os.path.exists(json_path):
with open(json_path, "r", encoding="utf-8") as f:
try:
entries = json.load(f)
except (json.JSONDecodeError, ValueError):
entries = []
abs_path = os.path.abspath(clip_path)
entry: dict = {"path": abs_path, "label": label}
for i, e in enumerate(entries):
if e.get("path") == abs_path:
entries[i] = entry
break
else:
entries.append(entry)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(entries, f, indent=2, ensure_ascii=False)
f.write("\n")
def detect_hw_encoders() -> list[str]:
"""Probe ffmpeg for available H.264 hardware encoders.
Returns a list like ["h264_nvenc", "h264_vaapi", ...].
Only includes encoders that ffmpeg reports as available.
"""
_HW_ENCODERS = ["h264_nvenc", "h264_vaapi", "h264_qsv", "h264_amf", "h264_videotoolbox"]
try:
result = subprocess.run(
[_bin("ffmpeg"), "-hide_banner", "-encoders"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0:
return []
output = result.stdout
except Exception:
return []
available = []
for enc in _HW_ENCODERS:
if re.search(rf'\b{enc}\b', output):
available.append(enc)
if available:
_log(f"HW encoders detected: {', '.join(available)}")
else:
_log("No HW encoders detected — GPU export unavailable")
return available
_RATIOS: dict[str, tuple[int, int]] = {
"9:16": (9, 16),
"4:5": (4, 5),
"1:1": (1, 1),
}
def _portrait_crop_filter(ratio: str, crop_center: float) -> str:
"""Return an ffmpeg crop= filter expression for the given portrait ratio.
Uses ffmpeg expression syntax so source dimensions are resolved at runtime.
Commas inside min()/max() are escaped with \\, to prevent ffmpeg's
filtergraph parser from treating them as filter-chain separators.
"""
num, den = _RATIOS[ratio]
cw = f"ih*{num}/{den}"
x = f"max(0\\,min((iw-{cw})*{crop_center}\\,iw-{cw}))"
return f"crop={cw}:ih:{x}:0"
_SELVA_CATEGORIES = ["", "Human", "Animal", "Vehicle", "Tool", "Music", "Nature", "Sport", "Other"] _SELVA_CATEGORIES = ["", "Human", "Animal", "Vehicle", "Tool", "Music", "Nature", "Sport", "Other"]
# ---------------------------------------------------------------------------
# Subject tracking (YOLO-based, optional)
# ---------------------------------------------------------------------------
_yolo_model = None
def _get_yolo():
"""Lazy-load YOLOv8-nano. Returns None if ultralytics is not installed."""
global _yolo_model
if _yolo_model is None:
try:
from ultralytics import YOLO
_yolo_model = YOLO("yolov8n.pt")
_log("YOLO model loaded")
except ImportError:
_log("ultralytics not installed — tracking disabled")
return None
except Exception as e:
_log(f"YOLO load failed: {e}")
return None
return _yolo_model
def extract_frame_cv(video_path: str, time: float):
"""Extract a single frame as a numpy array (BGR) via ffmpeg → temp PNG → cv2."""
try:
import cv2
import numpy as np
except ImportError:
return None
fd, tmp = tempfile.mkstemp(suffix=".png")
os.close(fd)
try:
cmd = [_bin("ffmpeg"), "-y", "-ss", str(time), "-i", video_path,
"-frames:v", "1", tmp]
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode != 0:
return None
return cv2.imread(tmp)
except Exception:
return None
finally:
if os.path.exists(tmp):
os.unlink(tmp)
def detect_subject_center(
video_path: str, time: float, target_cls: int | None, last_x: float, last_y: float,
) -> tuple[int | None, float, float] | None:
"""Detect objects at *time* and return (class_id, norm_x, norm_y) of the
best match to (target_cls, last_x, last_y). Returns None on failure."""
model = _get_yolo()
if model is None:
return None
frame = extract_frame_cv(video_path, time)
if frame is None:
return None
results = model(frame, verbose=False)
if not results or len(results[0].boxes) == 0:
return None
h, w = frame.shape[:2]
dets = []
for box in results[0].boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
cls = int(box.cls[0])
cx = (x1 + x2) / 2 / w
cy = (y1 + y2) / 2 / h
dets.append((cls, cx, cy))
# Prefer same class, nearest to last known position.
def score(d):
cls_penalty = 0 if (target_cls is None or d[0] == target_cls) else 1.0
dist = (d[1] - last_x) ** 2 + (d[2] - last_y) ** 2
return cls_penalty + dist
best = min(dets, key=score)
return best
def track_centers_for_jobs(
video_path: str, cursor: float, crop_center: float,
starts: list[float],
) -> list[float]:
"""Run detection at the cursor (to identify the target) then at each start
time. Returns a list of horizontal crop centers (one per start)."""
ref = detect_subject_center(video_path, cursor, None, crop_center, 0.5)
if ref is None:
_log("Tracking: no detection at cursor, using fixed center")
return [crop_center] * len(starts)
target_cls, last_x, last_y = ref
_log(f"Tracking: target class={target_cls} at ({last_x:.2f}, {last_y:.2f})")
centers = []
for t in starts:
det = detect_subject_center(video_path, t, target_cls, last_x, last_y)
if det is not None:
_, cx, cy = det
_log(f" t={t:.2f}s → center={cx:.3f}")
centers.append(cx)
last_x, last_y = cx, cy
else:
_log(f" t={t:.2f}s → lost, reusing {last_x:.3f}")
centers.append(last_x)
return centers
class ProcessedDB:
_SCHEMA_VERSION = 3 # bump when schema changes
def __init__(self, db_path: str | None = None):
if db_path is None:
db_path = str(Path.home() / ".8cut.db")
self._path = db_path
try:
self._con = sqlite3.connect(db_path, check_same_thread=False)
self._migrate()
self._enabled = True
_log(f"DB opened: {db_path}")
except Exception as e:
_log(f"DB unavailable: {e}")
self._con = None
self._enabled = False
def _migrate(self) -> None:
"""Create table if missing, then add any new columns for old DBs."""
cols = {
row[1]
for row in self._con.execute("PRAGMA table_info(processed)").fetchall()
}
if not cols:
# Fresh DB — create from scratch
self._con.execute(
"CREATE TABLE IF NOT EXISTS processed ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" filename TEXT NOT NULL,"
" start_time REAL NOT NULL,"
" output_path TEXT NOT NULL,"
" label TEXT NOT NULL DEFAULT '',"
" category TEXT NOT NULL DEFAULT '',"
" short_side INTEGER DEFAULT 512,"
" portrait_ratio TEXT NOT NULL DEFAULT '',"
" crop_center REAL NOT NULL DEFAULT 0.5,"
" format TEXT NOT NULL DEFAULT 'MP4',"
" clip_count INTEGER NOT NULL DEFAULT 3,"
" spread REAL NOT NULL DEFAULT 3.0,"
" profile TEXT NOT NULL DEFAULT 'default',"
" processed_at TEXT NOT NULL"
")"
)
else:
# Add missing columns to legacy tables
new_cols = {
"label": "TEXT NOT NULL DEFAULT ''",
"category": "TEXT NOT NULL DEFAULT ''",
"short_side": "INTEGER DEFAULT 512",
"portrait_ratio": "TEXT NOT NULL DEFAULT ''",
"crop_center": "REAL NOT NULL DEFAULT 0.5",
"format": "TEXT NOT NULL DEFAULT 'MP4'",
"clip_count": "INTEGER NOT NULL DEFAULT 3",
"spread": "REAL NOT NULL DEFAULT 3.0",
"profile": "TEXT NOT NULL DEFAULT 'default'",
}
for col, typedef in new_cols.items():
if col not in cols:
self._con.execute(
f"ALTER TABLE processed ADD COLUMN {col} {typedef}"
)
self._con.execute(
"CREATE INDEX IF NOT EXISTS idx_filename ON processed(filename)"
)
self._con.execute(
"CREATE TABLE IF NOT EXISTS hidden_files ("
" filename TEXT NOT NULL,"
" profile TEXT NOT NULL DEFAULT 'default',"
" PRIMARY KEY (filename, profile)"
")"
)
self._con.commit()
def add(self, filename: str, start_time: float, output_path: str,
label: str = "", category: str = "",
short_side: int | None = None, portrait_ratio: str = "",
crop_center: float = 0.5, fmt: str = "MP4",
clip_count: int = 3, spread: float = 3.0,
profile: str = "default") -> None:
if not self._enabled:
return
self._con.execute(
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, spread, profile, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, start_time, output_path, label, category,
short_side, portrait_ratio, crop_center, fmt,
clip_count, spread, profile,
datetime.now(timezone.utc).isoformat()),
)
self._con.commit()
def get_labels(self) -> list[str]:
"""Return distinct non-empty labels ordered by most recently used."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT label FROM processed"
" WHERE label != '' ORDER BY processed_at DESC"
).fetchall()
# Deduplicate while preserving order (DISTINCT on processed_at DESC
# may return duplicates if the same label was used multiple times).
seen: set[str] = set()
result = []
for (lbl,) in rows:
if lbl not in seen:
seen.add(lbl)
result.append(lbl)
return result
def get_by_output_path(self, output_path: str) -> dict | None:
"""Return config dict for an output_path, or None."""
if not self._enabled:
return None
self._con.row_factory = sqlite3.Row
row = self._con.execute(
"SELECT label, category, short_side, portrait_ratio, crop_center, format,"
" clip_count, spread"
" FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
self._con.row_factory = None
return dict(row) if row else None
def delete_by_output_path(self, output_path: str) -> None:
if not self._enabled:
return
self._con.execute("DELETE FROM processed WHERE output_path = ?", (output_path,))
self._con.commit()
def get_group(self, output_path: str) -> list[str]:
"""Return all output_paths sharing the same (filename, start_time) as *output_path*."""
if not self._enabled:
return []
row = self._con.execute(
"SELECT filename, start_time FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
if not row:
return []
rows = self._con.execute(
"SELECT output_path FROM processed"
" WHERE filename = ? AND start_time = ? ORDER BY output_path",
(row[0], row[1]),
).fetchall()
return [r[0] for r in rows]
def delete_group(self, output_path: str) -> list[str]:
"""Delete all rows sharing the same (filename, start_time) as *output_path*.
Returns list of deleted output_paths."""
if not self._enabled:
return []
row = self._con.execute(
"SELECT filename, start_time FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
if not row:
return []
filename, start_time = row
paths = [r[0] for r in self._con.execute(
"SELECT output_path FROM processed WHERE filename = ? AND start_time = ?",
(filename, start_time),
).fetchall()]
self._con.execute(
"DELETE FROM processed WHERE filename = ? AND start_time = ?",
(filename, start_time),
)
self._con.commit()
return paths
def _get_markers_for(self, match: str, profile: str = "default") -> list[tuple[float, int, str]]:
rows = self._con.execute(
"SELECT start_time, output_path FROM processed"
" WHERE filename = ? AND profile = ? ORDER BY start_time",
(match, profile),
).fetchall()
# Deduplicate by start_time — batch exports share the same cursor.
seen_times: dict[float, tuple[float, int, str]] = {}
n = 0
for t, p in rows:
if t not in seen_times:
n += 1
seen_times[t] = (t, n, p)
return list(seen_times.values())
def get_markers(self, filename: str, profile: str = "default") -> list[tuple[float, int, str]]:
"""Return [(start_time, marker_number, output_path), ...] for exact
filename match, sorted by start_time. Empty list if no match."""
if not self._enabled:
return []
return self._get_markers_for(filename, profile)
def get_profiles(self) -> list[str]:
"""Return distinct profile names, ordered alphabetically."""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT profile FROM processed ORDER BY profile"
).fetchall()
return [r[0] for r in rows]
def hide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return
self._con.execute(
"INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)",
(filename, profile),
)
self._con.commit()
def unhide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return
self._con.execute(
"DELETE FROM hidden_files WHERE filename = ? AND profile = ?",
(filename, profile),
)
self._con.commit()
def get_hidden_files(self, profile: str = "default") -> set[str]:
if not self._enabled:
return set()
rows = self._con.execute(
"SELECT filename FROM hidden_files WHERE profile = ?", (profile,)
).fetchall()
return {r[0] for r in rows}
class _DBWorker(QThread): class _DBWorker(QThread):
"""Runs ProcessedDB fuzzy-match lookup off the main thread.""" """Runs ProcessedDB fuzzy-match lookup off the main thread."""
result = pyqtSignal(str, object, list) # (queried_filename, match|None, markers) result = pyqtSignal(str, object, list) # (queried_filename, match|None, markers)