import tempfile, os, json from main import build_export_path, format_time, build_ffmpeg_command, build_sequence_dir, build_audio_extract_command, resolve_keyframe, apply_keyframes_to_jobs from core.annotations import build_annotation_json_path, upsert_clip_annotation from main import ProcessedDB def test_build_export_path_first(): assert build_export_path("/out", "clip", 1) == "/out/clip_001.mp4" def test_build_export_path_counter(): assert build_export_path("/out", "clip", 42) == "/out/clip_042.mp4" def test_build_export_path_deep_counter(): assert build_export_path("/out", "shot", 999) == "/out/shot_999.mp4" def test_build_export_path_sub(): assert build_export_path("/out", "clip", 1, sub=0) == "/out/clip_001_0.mp4" assert build_export_path("/out", "clip", 1, sub=2) == "/out/clip_001_2.mp4" def test_build_sequence_dir_sub(): assert build_sequence_dir("/out", "clip", 1, sub=0) == "/out/clip_001_0" assert build_sequence_dir("/out", "clip", 1, sub=1) == "/out/clip_001_1" def test_format_time_seconds(): assert format_time(0.0) == "0:00.0" def test_format_time_minutes(): assert format_time(75.3) == "1:15.2" def test_format_time_rounding(): assert format_time(61.05) == "1:01.0" def test_format_time_no_sixty_rollover(): assert format_time(59.95) == "0:59.9" def test_ffmpeg_command_no_resize(): cmd = build_ffmpeg_command("/in/video.mp4", 12.5, "/out/clip_001.mp4") assert cmd[0] == "ffmpeg" assert "-y" in cmd assert "-ss" in cmd assert str(12.5) in cmd assert "-t" in cmd assert "8" in cmd assert cmd[-1] == "/out/clip_001.mp4" assert "-vf" not in cmd def test_ffmpeg_command_with_resize(): cmd = build_ffmpeg_command("/in/video.mp4", 0.0, "/out/clip_001.mp4", short_side=256) assert "-vf" in cmd vf_value = cmd[cmd.index("-vf") + 1] assert "256" in vf_value assert "scale" in vf_value assert cmd[-1] == "/out/clip_001.mp4" # --- ProcessedDB --- def test_db_add_and_get_markers(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 12.5, "/out/clip_001.mp4") markers = db.get_markers("video.mp4") assert len(markers) == 1 assert markers[0][0] == 12.5 finally: os.unlink(path) def test_db_exact_match_only(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("episode_s01e01_2160p.mkv", 0.0, "/out/ep_001.mp4") # Different filename — no match even if similar assert db.get_markers("episode_s01e01_1080p.mkv") == [] # Exact filename — match assert len(db.get_markers("episode_s01e01_2160p.mkv")) == 1 finally: os.unlink(path) def test_db_no_match(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("alpha.mp4", 0.0, "/out/alpha_001.mp4") assert db.get_markers("completely_different.mp4") == [] finally: os.unlink(path) def test_db_disabled_survives_bad_path(): db = ProcessedDB("/no/such/directory/8cut.db") db.add("x.mp4", 0.0, "/out/x_001.mp4") # must not raise assert db.get_markers("x.mp4") == [] def test_db_get_markers_returns_sorted(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 30.0, "/out/clip_002.mp4") db.add("video.mp4", 10.0, "/out/clip_001.mp4") db.add("video.mp4", 50.0, "/out/clip_003.mp4") markers = db.get_markers("video.mp4") assert len(markers) == 3 assert markers[0] == (10.0, 1, "/out/clip_001.mp4") assert markers[1] == (30.0, 2, "/out/clip_002.mp4") assert markers[2] == (50.0, 3, "/out/clip_003.mp4") finally: os.unlink(path) def test_db_get_markers_no_match(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) markers = db.get_markers("nothing.mp4") assert markers == [] finally: os.unlink(path) def test_db_get_markers_disabled(): db = ProcessedDB("/no/such/directory/8cut.db") assert db.get_markers("x.mp4") == [] def test_ffmpeg_command_portrait_only(): cmd = build_ffmpeg_command( "/in/video.mp4", 0.0, "/out/clip.mp4", portrait_ratio="9:16", crop_center=0.5, ) assert "-vf" in cmd vf = cmd[cmd.index("-vf") + 1] assert "crop" in vf assert "9" in vf assert "scale" not in vf assert cmd[-1] == "/out/clip.mp4" def test_ffmpeg_command_portrait_and_resize(): cmd = build_ffmpeg_command( "/in/video.mp4", 0.0, "/out/clip.mp4", short_side=256, portrait_ratio="9:16", crop_center=0.5, ) assert "-vf" in cmd vf = cmd[cmd.index("-vf") + 1] assert "crop" in vf assert "scale" in vf assert vf.index("crop") < vf.index("scale") assert cmd[-1] == "/out/clip.mp4" def test_ffmpeg_command_portrait_off(): cmd = build_ffmpeg_command("/in/video.mp4", 0.0, "/out/clip.mp4") assert "-vf" not in cmd # --- build_audio_extract_command --- def test_audio_extract_output_path(): cmd = build_audio_extract_command("/in/v.mp4", 0.0, "/out/clip_001") assert cmd[-1] == "/out/clip_001.wav" def test_audio_extract_no_video(): cmd = build_audio_extract_command("/in/v.mp4", 0.0, "/out/clip_001") assert "-vn" in cmd def test_audio_extract_lossless_codec(): cmd = build_audio_extract_command("/in/v.mp4", 0.0, "/out/clip_001") assert "-c:a" in cmd assert cmd[cmd.index("-c:a") + 1] == "pcm_s16le" def test_audio_extract_timing(): cmd = build_audio_extract_command("/in/v.mp4", 12.5, "/out/clip_001") assert "-ss" in cmd assert cmd[cmd.index("-ss") + 1] == "12.5" assert "-t" in cmd assert cmd[cmd.index("-t") + 1] == "8" def test_build_sequence_dir_basic(): assert build_sequence_dir("/out", "clip", 1) == "/out/clip_001" def test_build_sequence_dir_counter(): assert build_sequence_dir("/out", "clip", 42) == "/out/clip_042" def test_ffmpeg_command_image_sequence(): cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/seq_001", image_sequence=True) assert "-c:v" in cmd assert cmd[cmd.index("-c:v") + 1] == "libwebp" assert "-quality" in cmd assert cmd[-1] == "/out/seq_001/frame_%04d.webp" def test_ffmpeg_command_image_sequence_with_resize(): cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/seq_001", image_sequence=True, short_side=256) assert "-vf" in cmd vf = cmd[cmd.index("-vf") + 1] assert "scale" in vf assert cmd[-1] == "/out/seq_001/frame_%04d.webp" def test_ffmpeg_command_image_sequence_no_audio(): cmd = build_ffmpeg_command("/in/v.mp4", 0.0, "/out/seq_001", image_sequence=True) assert "-an" in cmd assert "-c:a" not in cmd assert "aac" not in cmd def test_annotation_json_path(): assert build_annotation_json_path("/out") == "/out/dataset.json" def test_upsert_creates_file(): with tempfile.TemporaryDirectory() as d: clip = os.path.join(d, "clip_001.mp4") upsert_clip_annotation(d, clip, "dog barking") with open(os.path.join(d, "dataset.json")) as f: entries = json.load(f) assert len(entries) == 1 assert entries[0]["label"] == "dog barking" assert entries[0]["path"] == clip def test_upsert_appends_new_clips(): with tempfile.TemporaryDirectory() as d: upsert_clip_annotation(d, os.path.join(d, "clip_001.mp4"), "dog barking") upsert_clip_annotation(d, os.path.join(d, "clip_002.mp4"), "cat meowing") with open(os.path.join(d, "dataset.json")) as f: entries = json.load(f) assert len(entries) == 2 def test_upsert_replaces_existing(): with tempfile.TemporaryDirectory() as d: clip = os.path.join(d, "clip_001.mp4") upsert_clip_annotation(d, clip, "dog barking") upsert_clip_annotation(d, clip, "cat meowing") with open(os.path.join(d, "dataset.json")) as f: entries = json.load(f) assert len(entries) == 1 assert entries[0]["label"] == "cat meowing" def test_upsert_empty_label_skips(): with tempfile.TemporaryDirectory() as d: upsert_clip_annotation(d, os.path.join(d, "clip_001.mp4"), "") assert not os.path.exists(os.path.join(d, "dataset.json")) def test_upsert_missing_folder_creates_it(): with tempfile.TemporaryDirectory() as d: nested = os.path.join(d, "subdir", "deep") upsert_clip_annotation(nested, os.path.join(nested, "clip_001.mp4"), "dog barking") assert os.path.exists(os.path.join(nested, "dataset.json")) def test_db_stores_label_and_category(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 0.0, "/out/clip_001.mp4", label="dog barking", category="Animal") row = db._con.execute( "SELECT label, category FROM processed WHERE filename = ?", ("video.mp4",) ).fetchone() assert row == ("dog barking", "Animal") finally: os.unlink(path) def test_db_get_group_returns_all_sub_clips(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 10.0, "/out/vid_001/clip_001_0.mp4") db.add("video.mp4", 10.0, "/out/vid_001/clip_001_1.mp4") db.add("video.mp4", 10.0, "/out/vid_001/clip_001_2.mp4") group = db.get_group("/out/vid_001/clip_001_0.mp4") assert len(group) == 3 assert "/out/vid_001/clip_001_0.mp4" in group assert "/out/vid_001/clip_001_2.mp4" in group finally: os.unlink(path) def test_db_get_group_isolates_by_start_time(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 10.0, "/out/vid_001/clip_001_0.mp4") db.add("video.mp4", 10.0, "/out/vid_001/clip_001_1.mp4") db.add("video.mp4", 30.0, "/out/vid_001/clip_002_0.mp4") group = db.get_group("/out/vid_001/clip_001_0.mp4") assert len(group) == 2 finally: os.unlink(path) def test_db_delete_group_removes_all(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 10.0, "/out/vid_001/clip_001_0.mp4") db.add("video.mp4", 10.0, "/out/vid_001/clip_001_1.mp4") db.add("video.mp4", 30.0, "/out/vid_001/clip_002_0.mp4") deleted = db.delete_group("/out/vid_001/clip_001_0.mp4") assert len(deleted) == 2 # clip_002 should still exist markers = db.get_markers("video.mp4") assert len(markers) == 1 assert markers[0][0] == 30.0 finally: os.unlink(path) def test_db_get_group_disabled(): db = ProcessedDB("/no/such/directory/8cut.db") assert db.get_group("/out/clip_001.mp4") == [] def test_db_delete_group_disabled(): db = ProcessedDB("/no/such/directory/8cut.db") assert db.delete_group("/out/clip_001.mp4") == [] # --- Profiles --- def test_db_markers_isolated_by_profile(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 10.0, "/out/a_001.mp4", profile="landscape") db.add("video.mp4", 20.0, "/out/b_001.mp4", profile="portrait") land = db.get_markers("video.mp4", profile="landscape") port = db.get_markers("video.mp4", profile="portrait") assert len(land) == 1 assert land[0][0] == 10.0 assert len(port) == 1 assert port[0][0] == 20.0 finally: os.unlink(path) def test_db_get_profiles(): with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) assert db.get_profiles() == [] db.add("a.mp4", 0.0, "/out/a.mp4", profile="beta") db.add("b.mp4", 0.0, "/out/b.mp4", profile="alpha") db.add("c.mp4", 0.0, "/out/c.mp4", profile="beta") profiles = db.get_profiles() assert profiles == ["alpha", "beta"] finally: os.unlink(path) def test_db_get_profiles_disabled(): db = ProcessedDB("/no/such/directory/8cut.db") assert db.get_profiles() == [] def test_db_default_profile_backward_compat(): """Existing tests pass without explicit profile — defaults to 'default'.""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: path = f.name try: db = ProcessedDB(path) db.add("video.mp4", 5.0, "/out/clip.mp4") markers = db.get_markers("video.mp4") # no profile arg assert len(markers) == 1 assert markers[0][0] == 5.0 assert db.get_profiles() == ["default"] finally: os.unlink(path) # --- resolve_keyframe --- def test_resolve_keyframe_empty(): assert resolve_keyframe([], 5.0) is None def test_resolve_keyframe_before_first(): kfs = [(3.0, 0.5, None, False, False)] assert resolve_keyframe(kfs, 1.0) is None def test_resolve_keyframe_exact(): kfs = [(2.0, 0.3, "9:16", True, False)] assert resolve_keyframe(kfs, 2.0) == (2.0, 0.3, "9:16", True, False) def test_resolve_keyframe_between(): kfs = [ (1.0, 0.2, None, False, False), (5.0, 0.8, "1:1", False, True), ] assert resolve_keyframe(kfs, 3.0) == (1.0, 0.2, None, False, False) def test_resolve_keyframe_after_last(): kfs = [ (1.0, 0.2, None, False, False), (5.0, 0.8, "1:1", False, True), ] assert resolve_keyframe(kfs, 10.0) == (5.0, 0.8, "1:1", False, True) def test_resolve_keyframe_tolerance(): kfs = [(4.0, 0.5, None, True, True)] assert resolve_keyframe(kfs, 3.96) == (4.0, 0.5, None, True, True) # --- apply_keyframes_to_jobs --- def test_apply_keyframes_no_keyframes(): jobs = [(0.0, "/out/a", None, 0.5), (3.0, "/out/b", None, 0.5)] result = apply_keyframes_to_jobs(jobs, [], base_center=0.5, base_ratio=None, base_rand_p=True, base_rand_s=False) assert result == [ (0.0, "/out/a", None, 0.5, True, False), (3.0, "/out/b", None, 0.5, True, False), ] def test_apply_keyframes_with_keyframes(): kfs = [ (0.0, 0.3, "9:16", True, False), (4.0, 0.7, None, False, True), ] jobs = [ (0.0, "/out/a", None, 0.5), (3.0, "/out/b", None, 0.5), (6.0, "/out/c", None, 0.5), ] result = apply_keyframes_to_jobs(jobs, kfs, base_center=0.5, base_ratio=None, base_rand_p=False, base_rand_s=False) assert result == [ (0.0, "/out/a", "9:16", 0.3, True, False), (3.0, "/out/b", "9:16", 0.3, True, False), (6.0, "/out/c", None, 0.7, False, True), ] def test_apply_keyframes_before_first_uses_base(): kfs = [(5.0, 0.8, "1:1", False, True)] jobs = [(1.0, "/out/a", None, 0.5)] result = apply_keyframes_to_jobs(jobs, kfs, base_center=0.5, base_ratio="4:5", base_rand_p=True, base_rand_s=False) assert result == [(1.0, "/out/a", "4:5", 0.5, True, False)] # --- LTX-2 legal-frame math (core/ltx2.py) --- from core.ltx2 import is_legal_frames, nearest_legal_frames, frames_for_duration, duration_for_frames, legal_frames def test_ltx2_is_legal(): assert is_legal_frames(201) and is_legal_frames(9) and is_legal_frames(25) assert not is_legal_frames(200) and not is_legal_frames(8) def test_ltx2_nearest(): assert nearest_legal_frames(200) == 201 # 200 -> nearest 8k+1 assert nearest_legal_frames(196) == 193 assert nearest_legal_frames(5) == 9 # floor at 9 def test_ltx2_duration_roundtrip(): assert duration_for_frames(201, 25) == 201 / 25 assert frames_for_duration(8.0, 25) == 201 # 200 -> 201 def test_ltx2_legal_series(): s = legal_frames(min_f=9, max_f=33) assert s == [9, 17, 25, 33]