92774216d4
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
487 lines
17 KiB
Python
487 lines
17 KiB
Python
import tempfile, os, json
|
|
from main import build_export_path, format_time, build_ffmpeg_command, build_sequence_dir, build_audio_extract_command, resolve_keyframe, apply_keyframes_to_jobs
|
|
from core.annotations import build_annotation_json_path, upsert_clip_annotation
|
|
from main import ProcessedDB
|
|
|
|
|
|
def test_build_export_path_first():
|
|
assert build_export_path("/out", "clip", 1) == "/out/clip_001.mp4"
|
|
|
|
def test_build_export_path_counter():
|
|
assert build_export_path("/out", "clip", 42) == "/out/clip_042.mp4"
|
|
|
|
def test_build_export_path_deep_counter():
|
|
assert build_export_path("/out", "shot", 999) == "/out/shot_999.mp4"
|
|
|
|
def test_build_export_path_sub():
|
|
assert build_export_path("/out", "clip", 1, sub=0) == "/out/clip_001_0.mp4"
|
|
assert build_export_path("/out", "clip", 1, sub=2) == "/out/clip_001_2.mp4"
|
|
|
|
def test_build_sequence_dir_sub():
|
|
assert build_sequence_dir("/out", "clip", 1, sub=0) == "/out/clip_001_0"
|
|
assert build_sequence_dir("/out", "clip", 1, sub=1) == "/out/clip_001_1"
|
|
|
|
def test_format_time_seconds():
|
|
assert format_time(0.0) == "0:00.0"
|
|
|
|
def test_format_time_minutes():
|
|
assert format_time(75.3) == "1:15.2"
|
|
|
|
def test_format_time_rounding():
|
|
assert format_time(61.05) == "1:01.0"
|
|
|
|
def test_format_time_no_sixty_rollover():
|
|
assert format_time(59.95) == "0:59.9"
|
|
|
|
|
|
def test_ffmpeg_command_no_resize():
|
|
cmd = build_ffmpeg_command("/in/video.mp4", 12.5, "/out/clip_001.mp4")
|
|
assert cmd[0] == "ffmpeg"
|
|
assert "-y" in cmd
|
|
assert "-ss" in cmd
|
|
assert str(12.5) in cmd
|
|
assert "-t" in cmd
|
|
assert "8" in cmd
|
|
assert cmd[-1] == "/out/clip_001.mp4"
|
|
assert "-vf" not in cmd
|
|
|
|
def test_ffmpeg_command_with_resize():
|
|
cmd = build_ffmpeg_command("/in/video.mp4", 0.0, "/out/clip_001.mp4", short_side=256)
|
|
assert "-vf" in cmd
|
|
vf_value = cmd[cmd.index("-vf") + 1]
|
|
assert "256" in vf_value
|
|
assert "scale" in vf_value
|
|
assert cmd[-1] == "/out/clip_001.mp4"
|
|
|
|
|
|
# --- ProcessedDB ---
|
|
|
|
def test_db_add_and_get_markers():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 12.5, "/out/clip_001.mp4")
|
|
markers = db.get_markers("video.mp4")
|
|
assert len(markers) == 1
|
|
assert markers[0][0] == 12.5
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_db_exact_match_only():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("episode_s01e01_2160p.mkv", 0.0, "/out/ep_001.mp4")
|
|
# Different filename — no match even if similar
|
|
assert db.get_markers("episode_s01e01_1080p.mkv") == []
|
|
# Exact filename — match
|
|
assert len(db.get_markers("episode_s01e01_2160p.mkv")) == 1
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_db_no_match():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("alpha.mp4", 0.0, "/out/alpha_001.mp4")
|
|
assert db.get_markers("completely_different.mp4") == []
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_db_disabled_survives_bad_path():
|
|
db = ProcessedDB("/no/such/directory/8cut.db")
|
|
db.add("x.mp4", 0.0, "/out/x_001.mp4") # must not raise
|
|
assert db.get_markers("x.mp4") == []
|
|
|
|
def test_db_get_markers_returns_sorted():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 30.0, "/out/clip_002.mp4")
|
|
db.add("video.mp4", 10.0, "/out/clip_001.mp4")
|
|
db.add("video.mp4", 50.0, "/out/clip_003.mp4")
|
|
markers = db.get_markers("video.mp4")
|
|
assert len(markers) == 3
|
|
assert markers[0] == (10.0, 1, "/out/clip_001.mp4")
|
|
assert markers[1] == (30.0, 2, "/out/clip_002.mp4")
|
|
assert markers[2] == (50.0, 3, "/out/clip_003.mp4")
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_db_get_markers_no_match():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
markers = db.get_markers("nothing.mp4")
|
|
assert markers == []
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_db_get_markers_disabled():
|
|
db = ProcessedDB("/no/such/directory/8cut.db")
|
|
assert db.get_markers("x.mp4") == []
|
|
|
|
def test_ffmpeg_command_portrait_only():
|
|
cmd = build_ffmpeg_command(
|
|
"/in/video.mp4", 0.0, "/out/clip.mp4",
|
|
portrait_ratio="9:16", crop_center=0.5,
|
|
)
|
|
assert "-vf" in cmd
|
|
vf = cmd[cmd.index("-vf") + 1]
|
|
assert "crop" in vf
|
|
assert "9" in vf
|
|
assert "scale" not in vf
|
|
assert cmd[-1] == "/out/clip.mp4"
|
|
|
|
def test_ffmpeg_command_portrait_and_resize():
|
|
cmd = build_ffmpeg_command(
|
|
"/in/video.mp4", 0.0, "/out/clip.mp4",
|
|
short_side=256, portrait_ratio="9:16", crop_center=0.5,
|
|
)
|
|
assert "-vf" in cmd
|
|
vf = cmd[cmd.index("-vf") + 1]
|
|
assert "crop" in vf
|
|
assert "scale" in vf
|
|
assert vf.index("crop") < vf.index("scale")
|
|
assert cmd[-1] == "/out/clip.mp4"
|
|
|
|
def test_ffmpeg_command_portrait_off():
|
|
cmd = build_ffmpeg_command("/in/video.mp4", 0.0, "/out/clip.mp4")
|
|
assert "-vf" not in cmd
|
|
|
|
# --- build_audio_extract_command ---
|
|
|
|
def test_audio_extract_output_path():
|
|
cmd = build_audio_extract_command("/in/v.mp4", 0.0, "/out/clip_001")
|
|
assert cmd[-1] == "/out/clip_001.wav"
|
|
|
|
def test_audio_extract_no_video():
|
|
cmd = build_audio_extract_command("/in/v.mp4", 0.0, "/out/clip_001")
|
|
assert "-vn" in cmd
|
|
|
|
def test_audio_extract_lossless_codec():
|
|
cmd = build_audio_extract_command("/in/v.mp4", 0.0, "/out/clip_001")
|
|
assert "-c:a" in cmd
|
|
assert cmd[cmd.index("-c:a") + 1] == "pcm_s16le"
|
|
|
|
def test_audio_extract_timing():
|
|
cmd = build_audio_extract_command("/in/v.mp4", 12.5, "/out/clip_001")
|
|
assert "-ss" in cmd
|
|
assert cmd[cmd.index("-ss") + 1] == "12.5"
|
|
assert "-t" in cmd
|
|
assert cmd[cmd.index("-t") + 1] == "8"
|
|
|
|
|
|
def test_build_sequence_dir_basic():
|
|
assert build_sequence_dir("/out", "clip", 1) == "/out/clip_001"
|
|
|
|
def test_build_sequence_dir_counter():
|
|
assert build_sequence_dir("/out", "clip", 42) == "/out/clip_042"
|
|
|
|
def test_ffmpeg_command_image_sequence():
|
|
cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/seq_001", image_sequence=True)
|
|
assert "-c:v" in cmd
|
|
assert cmd[cmd.index("-c:v") + 1] == "libwebp"
|
|
assert "-quality" in cmd
|
|
assert cmd[-1] == "/out/seq_001/frame_%04d.webp"
|
|
|
|
def test_ffmpeg_command_image_sequence_with_resize():
|
|
cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/seq_001", image_sequence=True, short_side=256)
|
|
assert "-vf" in cmd
|
|
vf = cmd[cmd.index("-vf") + 1]
|
|
assert "scale" in vf
|
|
assert cmd[-1] == "/out/seq_001/frame_%04d.webp"
|
|
|
|
def test_ffmpeg_command_image_sequence_no_audio():
|
|
cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/seq_001", image_sequence=True)
|
|
assert "-an" in cmd
|
|
assert "-c:a" not in cmd
|
|
assert "aac" not in cmd
|
|
|
|
|
|
def test_annotation_json_path():
|
|
assert build_annotation_json_path("/out") == "/out/dataset.json"
|
|
|
|
def test_upsert_creates_file():
|
|
with tempfile.TemporaryDirectory() as d:
|
|
clip = os.path.join(d, "clip_001.mp4")
|
|
upsert_clip_annotation(d, clip, "dog barking")
|
|
with open(os.path.join(d, "dataset.json")) as f:
|
|
entries = json.load(f)
|
|
assert len(entries) == 1
|
|
assert entries[0]["label"] == "dog barking"
|
|
assert entries[0]["path"] == clip
|
|
|
|
def test_upsert_appends_new_clips():
|
|
with tempfile.TemporaryDirectory() as d:
|
|
upsert_clip_annotation(d, os.path.join(d, "clip_001.mp4"), "dog barking")
|
|
upsert_clip_annotation(d, os.path.join(d, "clip_002.mp4"), "cat meowing")
|
|
with open(os.path.join(d, "dataset.json")) as f:
|
|
entries = json.load(f)
|
|
assert len(entries) == 2
|
|
|
|
def test_upsert_replaces_existing():
|
|
with tempfile.TemporaryDirectory() as d:
|
|
clip = os.path.join(d, "clip_001.mp4")
|
|
upsert_clip_annotation(d, clip, "dog barking")
|
|
upsert_clip_annotation(d, clip, "cat meowing")
|
|
with open(os.path.join(d, "dataset.json")) as f:
|
|
entries = json.load(f)
|
|
assert len(entries) == 1
|
|
assert entries[0]["label"] == "cat meowing"
|
|
|
|
def test_upsert_empty_label_skips():
|
|
with tempfile.TemporaryDirectory() as d:
|
|
upsert_clip_annotation(d, os.path.join(d, "clip_001.mp4"), "")
|
|
assert not os.path.exists(os.path.join(d, "dataset.json"))
|
|
|
|
def test_upsert_missing_folder_creates_it():
|
|
with tempfile.TemporaryDirectory() as d:
|
|
nested = os.path.join(d, "subdir", "deep")
|
|
upsert_clip_annotation(nested, os.path.join(nested, "clip_001.mp4"), "dog barking")
|
|
assert os.path.exists(os.path.join(nested, "dataset.json"))
|
|
|
|
def test_db_stores_label_and_category():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 0.0, "/out/clip_001.mp4", label="dog barking", category="Animal")
|
|
row = db._con.execute(
|
|
"SELECT label, category FROM processed WHERE filename = ?", ("video.mp4",)
|
|
).fetchone()
|
|
assert row == ("dog barking", "Animal")
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
def test_db_get_group_returns_all_sub_clips():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_0.mp4")
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_1.mp4")
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_2.mp4")
|
|
group = db.get_group("/out/vid_001/clip_001_0.mp4")
|
|
assert len(group) == 3
|
|
assert "/out/vid_001/clip_001_0.mp4" in group
|
|
assert "/out/vid_001/clip_001_2.mp4" in group
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
def test_db_get_group_isolates_by_start_time():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_0.mp4")
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_1.mp4")
|
|
db.add("video.mp4", 30.0, "/out/vid_001/clip_002_0.mp4")
|
|
group = db.get_group("/out/vid_001/clip_001_0.mp4")
|
|
assert len(group) == 2
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
def test_db_delete_group_removes_all():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_0.mp4")
|
|
db.add("video.mp4", 10.0, "/out/vid_001/clip_001_1.mp4")
|
|
db.add("video.mp4", 30.0, "/out/vid_001/clip_002_0.mp4")
|
|
deleted = db.delete_group("/out/vid_001/clip_001_0.mp4")
|
|
assert len(deleted) == 2
|
|
# clip_002 should still exist
|
|
markers = db.get_markers("video.mp4")
|
|
assert len(markers) == 1
|
|
assert markers[0][0] == 30.0
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
def test_db_get_group_disabled():
|
|
db = ProcessedDB("/no/such/directory/8cut.db")
|
|
assert db.get_group("/out/clip_001.mp4") == []
|
|
|
|
|
|
def test_db_delete_group_disabled():
|
|
db = ProcessedDB("/no/such/directory/8cut.db")
|
|
assert db.delete_group("/out/clip_001.mp4") == []
|
|
|
|
|
|
# --- Profiles ---
|
|
|
|
def test_db_markers_isolated_by_profile():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 10.0, "/out/a_001.mp4", profile="landscape")
|
|
db.add("video.mp4", 20.0, "/out/b_001.mp4", profile="portrait")
|
|
land = db.get_markers("video.mp4", profile="landscape")
|
|
port = db.get_markers("video.mp4", profile="portrait")
|
|
assert len(land) == 1
|
|
assert land[0][0] == 10.0
|
|
assert len(port) == 1
|
|
assert port[0][0] == 20.0
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
def test_db_get_profiles():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
assert db.get_profiles() == []
|
|
db.add("a.mp4", 0.0, "/out/a.mp4", profile="beta")
|
|
db.add("b.mp4", 0.0, "/out/b.mp4", profile="alpha")
|
|
db.add("c.mp4", 0.0, "/out/c.mp4", profile="beta")
|
|
profiles = db.get_profiles()
|
|
assert profiles == ["alpha", "beta"]
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
def test_db_get_profiles_disabled():
|
|
db = ProcessedDB("/no/such/directory/8cut.db")
|
|
assert db.get_profiles() == []
|
|
|
|
|
|
def test_db_default_profile_backward_compat():
|
|
"""Existing tests pass without explicit profile — defaults to 'default'."""
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
db = ProcessedDB(path)
|
|
db.add("video.mp4", 5.0, "/out/clip.mp4")
|
|
markers = db.get_markers("video.mp4") # no profile arg
|
|
assert len(markers) == 1
|
|
assert markers[0][0] == 5.0
|
|
assert db.get_profiles() == ["default"]
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
|
|
# --- resolve_keyframe ---
|
|
|
|
def test_resolve_keyframe_empty():
|
|
assert resolve_keyframe([], 5.0) is None
|
|
|
|
def test_resolve_keyframe_before_first():
|
|
kfs = [(3.0, 0.5, None, False, False)]
|
|
assert resolve_keyframe(kfs, 1.0) is None
|
|
|
|
def test_resolve_keyframe_exact():
|
|
kfs = [(2.0, 0.3, "9:16", True, False)]
|
|
assert resolve_keyframe(kfs, 2.0) == (2.0, 0.3, "9:16", True, False)
|
|
|
|
def test_resolve_keyframe_between():
|
|
kfs = [
|
|
(1.0, 0.2, None, False, False),
|
|
(5.0, 0.8, "1:1", False, True),
|
|
]
|
|
assert resolve_keyframe(kfs, 3.0) == (1.0, 0.2, None, False, False)
|
|
|
|
def test_resolve_keyframe_after_last():
|
|
kfs = [
|
|
(1.0, 0.2, None, False, False),
|
|
(5.0, 0.8, "1:1", False, True),
|
|
]
|
|
assert resolve_keyframe(kfs, 10.0) == (5.0, 0.8, "1:1", False, True)
|
|
|
|
def test_resolve_keyframe_tolerance():
|
|
kfs = [(4.0, 0.5, None, True, True)]
|
|
assert resolve_keyframe(kfs, 3.96) == (4.0, 0.5, None, True, True)
|
|
|
|
|
|
# --- apply_keyframes_to_jobs ---
|
|
|
|
def test_apply_keyframes_no_keyframes():
|
|
jobs = [(0.0, "/out/a", None, 0.5), (3.0, "/out/b", None, 0.5)]
|
|
result = apply_keyframes_to_jobs(jobs, [], base_center=0.5, base_ratio=None,
|
|
base_rand_p=True, base_rand_s=False)
|
|
assert result == [
|
|
(0.0, "/out/a", None, 0.5, True, False),
|
|
(3.0, "/out/b", None, 0.5, True, False),
|
|
]
|
|
|
|
def test_apply_keyframes_with_keyframes():
|
|
kfs = [
|
|
(0.0, 0.3, "9:16", True, False),
|
|
(4.0, 0.7, None, False, True),
|
|
]
|
|
jobs = [
|
|
(0.0, "/out/a", None, 0.5),
|
|
(3.0, "/out/b", None, 0.5),
|
|
(6.0, "/out/c", None, 0.5),
|
|
]
|
|
result = apply_keyframes_to_jobs(jobs, kfs, base_center=0.5, base_ratio=None,
|
|
base_rand_p=False, base_rand_s=False)
|
|
assert result == [
|
|
(0.0, "/out/a", "9:16", 0.3, True, False),
|
|
(3.0, "/out/b", "9:16", 0.3, True, False),
|
|
(6.0, "/out/c", None, 0.7, False, True),
|
|
]
|
|
|
|
def test_apply_keyframes_before_first_uses_base():
|
|
kfs = [(5.0, 0.8, "1:1", False, True)]
|
|
jobs = [(1.0, "/out/a", None, 0.5)]
|
|
result = apply_keyframes_to_jobs(jobs, kfs, base_center=0.5, base_ratio="4:5",
|
|
base_rand_p=True, base_rand_s=False)
|
|
assert result == [(1.0, "/out/a", "4:5", 0.5, True, False)]
|
|
|
|
|
|
# --- LTX-2 legal-frame math (core/ltx2.py) ---
|
|
|
|
from core.ltx2 import is_legal_frames, nearest_legal_frames, frames_for_duration, duration_for_frames, legal_frames
|
|
|
|
def test_ltx2_is_legal():
|
|
assert is_legal_frames(201) and is_legal_frames(9) and is_legal_frames(25)
|
|
assert not is_legal_frames(200) and not is_legal_frames(8)
|
|
|
|
def test_ltx2_nearest():
|
|
assert nearest_legal_frames(200) == 201 # 200 -> nearest 8k+1
|
|
assert nearest_legal_frames(196) == 193
|
|
assert nearest_legal_frames(5) == 9 # floor at 9
|
|
|
|
def test_ltx2_duration_roundtrip():
|
|
assert duration_for_frames(201, 25) == 201 / 25
|
|
assert frames_for_duration(8.0, 25) == 201 # 200 -> 201
|
|
|
|
def test_ltx2_legal_series():
|
|
s = legal_frames(min_f=9, max_f=33)
|
|
assert s == [9, 17, 25, 33]
|
|
|
|
|
|
# --- LTX-2 ffmpeg params (target_fps, snap32, frames) ---
|
|
|
|
def test_ffmpeg_ltx2_fps_and_frames():
|
|
cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/c.mp4",
|
|
short_side=512, target_fps=25, frames=201)
|
|
assert "-r" in cmd and cmd[cmd.index("-r")+1] == "25"
|
|
assert "-frames:v" in cmd and cmd[cmd.index("-frames:v")+1] == "201"
|
|
vf = cmd[cmd.index("-vf")+1]
|
|
assert "fps=25" in vf
|
|
|
|
def test_ffmpeg_ltx2_snap32_crop():
|
|
cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/c.mp4",
|
|
short_side=512, snap32=True)
|
|
vf = cmd[cmd.index("-vf")+1]
|
|
assert "crop=trunc(iw/32)*32:trunc(ih/32)*32" in vf
|
|
|
|
def test_ffmpeg_foley_unchanged():
|
|
cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/c.mp4", short_side=256)
|
|
assert "-r" not in cmd and "-frames:v" not in cmd
|
|
assert "crop=trunc" not in cmd[cmd.index("-vf")+1]
|
|
|