feat: ProcessedDB and _normalize_filename with tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from difflib import SequenceMatcher
|
||||
from pathlib import Path
|
||||
|
||||
from PyQt6.QtWidgets import (
|
||||
@@ -39,6 +43,62 @@ def build_ffmpeg_command(input_path: str, start: float, output_path: str) -> lis
|
||||
]
|
||||
|
||||
|
||||
def _normalize_filename(filename: str) -> str:
|
||||
"""Strip extension and common resolution/quality tags for fuzzy comparison."""
|
||||
name = os.path.splitext(filename)[0].lower()
|
||||
name = re.sub(
|
||||
r'(?<![a-z0-9])(2160p?|4k|8k|1080p?|720p?|480p?|360p?|240p?'
|
||||
r'|hdr|sdr|x264|x265|h264|h265|hevc|avc'
|
||||
r'|blu[-_.]?ray|webrip|web[-_.]dl|dvdrip|hdtv)(?![a-z0-9])',
|
||||
'', name, flags=re.IGNORECASE,
|
||||
)
|
||||
name = re.sub(r'[\s_\-\.]+', '_', name).strip('_')
|
||||
return name
|
||||
|
||||
|
||||
class ProcessedDB:
|
||||
def __init__(self, db_path: str | None = None):
|
||||
if db_path is None:
|
||||
db_path = str(Path.home() / ".8cut.db")
|
||||
try:
|
||||
self._con = sqlite3.connect(db_path)
|
||||
self._con.execute(
|
||||
"CREATE TABLE IF NOT EXISTS processed "
|
||||
"(filename TEXT NOT NULL, processed_at TEXT NOT NULL)"
|
||||
)
|
||||
self._con.commit()
|
||||
self._enabled = True
|
||||
except Exception as e:
|
||||
print(f"8-cut: DB unavailable: {e}", file=sys.stderr)
|
||||
self._con = None
|
||||
self._enabled = False
|
||||
|
||||
def add(self, filename: str) -> None:
|
||||
if not self._enabled:
|
||||
return
|
||||
self._con.execute(
|
||||
"INSERT INTO processed (filename, processed_at) VALUES (?, ?)",
|
||||
(filename, datetime.utcnow().isoformat()),
|
||||
)
|
||||
self._con.commit()
|
||||
|
||||
def find_similar(self, filename: str) -> str | None:
|
||||
if not self._enabled:
|
||||
return None
|
||||
rows = self._con.execute(
|
||||
"SELECT DISTINCT filename FROM processed"
|
||||
).fetchall()
|
||||
norm_new = _normalize_filename(filename)
|
||||
best_ratio, best_match = 0.0, None
|
||||
for (stored,) in rows:
|
||||
ratio = SequenceMatcher(
|
||||
None, norm_new, _normalize_filename(stored)
|
||||
).ratio()
|
||||
if ratio >= 0.75 and ratio > best_ratio:
|
||||
best_ratio, best_match = ratio, stored
|
||||
return best_match
|
||||
|
||||
|
||||
class ExportWorker(QThread):
|
||||
finished = pyqtSignal(str) # output path
|
||||
error = pyqtSignal(str) # error message
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import tempfile, os
|
||||
from main import build_export_path, format_time, build_ffmpeg_command
|
||||
from main import _normalize_filename, ProcessedDB
|
||||
|
||||
|
||||
def test_build_export_path_first():
|
||||
@@ -32,3 +34,62 @@ def test_ffmpeg_command():
|
||||
assert "-t" in cmd
|
||||
assert "8" in cmd
|
||||
assert cmd[-1] == "/out/clip_001.mp4"
|
||||
|
||||
|
||||
# --- _normalize_filename ---
|
||||
|
||||
def test_normalize_strips_extension():
|
||||
assert _normalize_filename("clip.mp4") == "clip"
|
||||
|
||||
def test_normalize_strips_resolution():
|
||||
assert _normalize_filename("clip_2160p.mp4") == "clip"
|
||||
|
||||
def test_normalize_strips_1080p():
|
||||
assert _normalize_filename("clip_1080p.mkv") == "clip"
|
||||
|
||||
def test_normalize_strips_multiple_tags():
|
||||
assert _normalize_filename("show_1080p_HDR.mkv") == "show"
|
||||
|
||||
def test_normalize_lowercases():
|
||||
assert _normalize_filename("MyVideo_4K.mp4") == "myvideo"
|
||||
|
||||
def test_normalize_collapses_separators():
|
||||
assert _normalize_filename("my__video--2160p.mp4") == "my_video"
|
||||
|
||||
|
||||
# --- ProcessedDB ---
|
||||
|
||||
def test_db_add_and_find_exact():
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
||||
path = f.name
|
||||
try:
|
||||
db = ProcessedDB(path)
|
||||
db.add("video.mp4")
|
||||
assert db.find_similar("video.mp4") == "video.mp4"
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
def test_db_find_similar_resolution_variant():
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
||||
path = f.name
|
||||
try:
|
||||
db = ProcessedDB(path)
|
||||
db.add("episode_s01e01_2160p.mkv")
|
||||
assert db.find_similar("episode_s01e01_1080p.mkv") == "episode_s01e01_2160p.mkv"
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
def test_db_find_similar_no_match():
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
||||
path = f.name
|
||||
try:
|
||||
db = ProcessedDB(path)
|
||||
db.add("alpha.mp4")
|
||||
assert db.find_similar("completely_different_zzzz.mp4") is None
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
def test_db_disabled_survives_bad_path():
|
||||
db = ProcessedDB("/no/such/directory/8cut.db")
|
||||
db.add("x.mp4") # must not raise
|
||||
assert db.find_similar("x.mp4") is None # gracefully returns None
|
||||
|
||||
Reference in New Issue
Block a user