Compare commits

..

25 Commits

Author SHA1 Message Date
a6461a2ce8 Handle missing source folders gracefully with placeholder display
Missing folders now show in red with (MISSING) tag instead of silently
disappearing. Auto-save always persists session settings (folder order,
trims, transitions) even when folders are missing, so the session
layout survives for later replacement via right-click > Replace Folder.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 16:34:11 +01:00
7c2acd326f Fix file list not updating when folder type changes
_set_folder_type was missing a _refresh_files() call, so changing a
folder between TRANSITION and MAIN didn't rebuild the file list until
a manual reorder.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 18:14:31 +01:00
30911e894a Sync file removal with trim slider for edge selections
Removing files contiguous from the start/end of a sequence now adjusts
the trim handles instead of tracking them as individual removals, so
dragging the slider back restores them. Middle removals still tracked
separately.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 17:55:45 +01:00
dd426b958b Fix trim label not reflecting removed files count
The trim label now shows removed file count alongside trim range,
e.g. "Frames 1-81 of 81 (78 included, 3 removed)" instead of
always showing the full disk count.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 17:49:07 +01:00
8dda4f56a0 Add session lock feature and fix format conversion in transition export
- Add locked column to sessions with toggle in restore dialog, preventing
  accidental deletion of important sessions (padlock icon, DB-level protection)
- Fix transition export copying source files as-is when output format differs
  (e.g. webp sources now convert to png when png format is selected)
- Fix ON CONFLICT clause in save_per_transition_settings to match UNIQUE constraint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 22:05:31 +01:00
793ba4243d Add cleanup unused videos feature to remove unreferenced video folders
Adds right-click context menu on empty source list space with "Cleanup Unused Videos..."
option that scans type_of_video/ directories for video folders not in the current session,
showing a dialog with checkboxes to delete or move them to .trash/.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 12:36:10 +01:00
15c00e5fd2 Fix transition overlap duplicates, trim slider sync, and drag performance
- Make each transition boundary symmetric (left_overlap controls MAIN→TRANS,
  right_overlap controls TRANS→MAIN) so frame indices map 1:1 with no repeats
- Track committed frames per folder to cap overlaps and prevent over-allocation
- Fix float truncation in frame mapping (int→round) that caused off-by-one dupes
- Sync trim slider to follow frame selection in Sequence Order / With Transitions
- Defer expensive file list rebuild to mouse release for smooth trim slider drag
- Apply trim settings to transition folders in both display and export paths
- Refresh trim slider after session restore to show correct file counts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 21:18:50 +01:00
2694a8cba3 Add split/merge sequence feature with fid-based folder tracking
Refactor folder settings from path-keyed to fid-keyed dicts so the same
physical folder can appear multiple times with independent trim, type,
and transition settings.  This enables splitting a MAIN folder into two
sub-sequences at an arbitrary frame boundary using complementary trim
ranges, and merging them back.

- Add split from file list context menu ("Split Sequence After This
  Frame") and source list context menu ("Split Sequence..." dialog)
- Add "Merge with Next" to undo splits on adjacent same-path entries
- Sub-sequences share the base sequence number (seq01-1, seq01-2) with
  continuous file indices so subsequent sequences are not renumbered
- Session save/restore handles duplicate paths via folder_order; restore
  falls back to _refresh_files when split entries are detected
- Export copy_matches now compares file contents when size matches but
  mtime differs, preventing false negatives on re-export

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 23:27:13 +01:00
6e2d6148af Fix OF preset default, content splitter drag lag, and sequence table column resize
- Stop restoring OF preset from session so widget default (Max) always applies
- Add minimum widths and size policies to content_splitter children
- Change sequence_table Main Frame column from Stretch to Interactive

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 15:22:10 +01:00
2599265410 Fix splitter drag acceleration by removing stretch factors
setStretchFactor on QSplitter interferes with manual handle dragging —
Qt applies stretch redistribution after each mouse move, compounding
the delta and creating an accelerating effect. Replaced with size
policies (Expanding on right panel) which achieve the same window-
resize behavior without fighting the splitter drag.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 14:14:32 +01:00
1de641d756 Fix timeline seconds label overlapping frame number column
The right-side time label was drawn at the viewport edge, overlapping
the # column when frame numbers had 3+ digits. Now drawn at the right
edge of column 1 (before the # column starts).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 14:11:46 +01:00
9f4b1e17f6 Prevent splitter panels from collapsing to zero
Both splitters now have setChildrenCollapsible(False) so neither side
can be dragged to zero width. Right panel gets minWidth 400 to stay
usable when the source panel is expanded.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 14:08:36 +01:00
cd8878c91b Fix source panel splitter drag: remove maxWidth clamp, widen handles
The source panel had maxWidth(400) which barely let the splitter move
beyond its starting position. Removed the cap, lowered minWidth from
250 to 150, and widened both splitter handles to 5px for easier grab.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 14:06:35 +01:00
f706328e4d Fix column resize: switch from ResizeToContents to Interactive
ResizeToContents columns can't be dragged — they snap back to their
computed width, making resize attempts feel broken. Changed to
Interactive mode with explicit initial widths and minimum section
size so columns are freely draggable while still having sensible
defaults.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 14:04:22 +01:00
893132f110 Add multi-select delete to Restore Session dialog
- Session list now supports extended selection (Shift+click for range,
  Ctrl+click for individual)
- "Delete Selected" button removes chosen sessions with confirmation
- List refreshes in-place after deletion so you can keep cleaning up
- Added delete_sessions() batch method to database (single transaction)
- Simplified delete_session() to rely on ON DELETE CASCADE

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 13:55:37 +01:00
00e393f7b9 Fix export crash: batch DB inserts and let event loop breathe
The export loop was opening a new DB connection per file AND starving
the Qt event loop, causing the progress bar to freeze then jump and
the app to crash during large copy exports.

Fixes:
- All record_symlink calls in both export paths now collect records
  and batch-insert in a single transaction at the end
- Added explicit QApplication.processEvents() in export loops
- Throttled progress label updates to every 10 files (text rendering
  was adding overhead on every iteration)
- Moved shutil import out of inner loops

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 13:53:09 +01:00
2c4778e598 Add Replace Folder option to preserve edits when swapping a clip
Right-click any folder in the source list → "Replace Folder..." opens
a file dialog. The new folder takes the old one's position and inherits
all settings: type override, trim, per-transition overlap, removed
files, and direct transition config.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 12:56:01 +01:00
0efc6e153f Fix session save freeze and restore losing removed files
Save freeze fix:
- Added record_symlinks_batch() that inserts all symlinks in a single
  DB transaction instead of opening a new connection per file
- _save_session and _auto_save_session now use batch inserts
- With 1700 files this goes from 1700 connection cycles to 1

Removed files fix:
- _restore_files_from_session now filters by _removed_files so
  individually deleted files stay removed even when restoring from
  session data that pre-dates the removal

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 12:06:01 +01:00
c03ee0665b Fix session save/restore losing folders, add frame column and UI improvements
Session restore fixes:
- AUTO-typed folders now default to MAIN on restore instead of using
  position-based index%2, which silently flipped half the folders to
  TRANSITION when restoring legacy sessions
- All restored folders get explicit type overrides so no folder relies
  on position-based typing after restore
- TRANSITION folders with symlink data are auto-recovered as MAIN
  (catches incorrectly saved types from older export paths)
- Export Sequence path now saves with save_effective_types=True,
  preventing folder type loss
- Removed redundant trim-only save that used unresolved paths
- Auto-save guards against overwriting sessions with empty file lists

UI improvements:
- Added 4th "Frame" column to Sequence Order tab showing overall
  frame number (1-based)
- Last frame of each sequence is bold for visual clarity
- Fixed column resizing (ResizeToContents + Stretch) to prevent
  column collapse bugs
- Save Session dialog now reports main + transition folder counts
- Default optical flow preset changed to Max

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 11:55:48 +01:00
82a1c2ff9f Fix export range, transition frame counting, session restore, and video encoding
- Fix export range not covering TRANSITION folder middle frames: range max
  was based on MAIN-only file count, causing blends at sequence end to be
  silently skipped. Now uses full sequence frame count from preview table.
- Fix preview table not counting TRANSITION middle frames: these frames are
  output as symlinks in export but were shown without sequence numbers in
  preview. Now displayed as [T] entries with proper output_seq numbering.
- Fix session restore path resolution: all folder paths now .resolve()'d on
  save and restored with _resolve_lookup() fallback for both raw and resolved
  forms. Fixes folder order corruption on restore.
- Fix legacy session restore: detect pre-migration sessions (all folder_order=0)
  and fall back to symlink-derived ordering with get_all_folder_settings().
- Fix ffmpeg concat demuxer duration format: use decimal instead of fraction.
- Fix QProgressDialog false cancellation from autoReset at max value.
- Fix Export with Transitions skipping TRANSITION folders entirely while
  preview processed them, causing cutoff at blend boundaries.
- Fix Encode Video Only not finding transition-exported files in trans_dest.
- Add video encoding module (core/video.py) with concat demuxer support.
- Add direct_transition_settings DB table and persistence.
- Add sticky folder types on reorder and placeholder transition slots.
- Add blend-skipped-range counter to export completion dialog.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 01:35:59 +01:00
78a1c8b795 Robust export with progress bar, file removal persistence, copy mode
- Rewrite _export_sequence with QProgressDialog, per-file error handling,
  cancel support, and continuous seq_00000 naming
- Add folder progress labels to _process_with_transitions
- Extend cleanup_old_links to remove film_temp_*.png temporaries
- Add copy-files checkbox for Docker/remote destinations
- Persist individually removed files across sessions (removed_files table)
- Recover file removals from export history for older sessions
- Save effective folder types in transition exports for reliable restore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 21:13:49 +01:00
5defd664ed film 2026-02-05 14:59:03 +01:00
e58dc27dce Make FILM the default for direct interpolation
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 11:55:06 +01:00
fb67ad5683 flow 2026-02-03 23:58:00 +01:00
8c1de543d3 doc 2026-02-03 23:31:41 +01:00
10 changed files with 5723 additions and 460 deletions

View File

@@ -13,18 +13,20 @@ A PyQt6 application for creating sequenced symlinks from image folders with adva
- Per-folder trim settings (exclude frames from start/end)
### Cross-Dissolve Transitions
Smooth blending between folder boundaries with three blend methods:
Smooth blending between folder boundaries with four blend methods:
| Method | Description | Quality | Speed |
|--------|-------------|---------|-------|
| **Cross-Dissolve** | Simple alpha blend | Good | Fastest |
| **Optical Flow** | Motion-compensated blend using OpenCV Farneback | Better | Medium |
| **RIFE (AI)** | Neural network frame interpolation | Best | Fast (GPU) |
| **RIFE (ncnn)** | Neural network interpolation via rife-ncnn-vulkan | Best | Fast (GPU) |
| **RIFE (Practical)** | PyTorch-based Practical-RIFE (v4.25/v4.26) | Best | Medium (GPU) |
- **Asymmetric overlap**: Set different frame counts for each side of a transition
- **Blend curves**: Linear, Ease In, Ease Out, Ease In/Out
- **Output formats**: PNG, JPEG (with quality), WebP (lossless with method setting)
- **RIFE auto-download**: Automatically downloads rife-ncnn-vulkan binary
- **Practical-RIFE models**: Auto-downloads from Google Drive on first use
### Preview
- **Video Preview**: Play video files from source folders
@@ -54,11 +56,24 @@ Smooth blending between folder boundaries with three blend methods:
pip install PyQt6 Pillow numpy opencv-python
```
### RIFE (Optional)
For AI-powered frame interpolation, the app can auto-download [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan) or you can install it manually:
- Select **RIFE (AI)** as the blend method
- Click **Download** to fetch the latest release
**Note:** Practical-RIFE creates its own isolated venv with PyTorch. The `gdown` package is installed automatically for downloading models from Google Drive.
### RIFE ncnn (Optional)
For AI-powered frame interpolation using Vulkan GPU acceleration:
- Select **RIFE (ncnn)** as the blend method
- Click **Download** to auto-fetch [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan)
- Or specify a custom binary path
- Models: rife-v4.6, rife-v4.15-lite, etc.
### Practical-RIFE (Optional)
For PyTorch-based frame interpolation with latest models:
- Select **RIFE (Practical)** as the blend method
- Click **Setup PyTorch** to create an isolated venv with PyTorch (~2GB)
- Models auto-download from Google Drive on first use
- Available models: v4.26, v4.25, v4.22, v4.20, v4.18, v4.15
- Optional ensemble mode for higher quality (slower)
The venv is stored at `~/.cache/video-montage-linker/venv-rife/`
## Usage
@@ -98,7 +113,8 @@ video-montage-linker/
├── core/
│ ├── models.py # Enums, dataclasses
│ ├── database.py # SQLite session management
│ ├── blender.py # Image blending, RIFE downloader
│ ├── blender.py # Image blending, RIFE downloader, Practical-RIFE env
│ ├── rife_worker.py # Practical-RIFE inference (runs in isolated venv)
│ └── manager.py # Symlink operations
└── ui/
├── widgets.py # TrimSlider, custom widgets

View File

@@ -4,8 +4,12 @@ from .models import (
BlendCurve,
BlendMethod,
FolderType,
DirectInterpolationMethod,
TransitionSettings,
PerTransitionSettings,
DirectTransitionSettings,
VideoPreset,
VIDEO_PRESETS,
BlendResult,
TransitionSpec,
LinkResult,
@@ -19,15 +23,20 @@ from .models import (
DatabaseError,
)
from .database import DatabaseManager
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv, FilmEnv, OPTICAL_FLOW_PRESETS
from .manager import SymlinkManager
from .video import encode_image_sequence, encode_from_file_list, find_ffmpeg
__all__ = [
'BlendCurve',
'BlendMethod',
'FolderType',
'DirectInterpolationMethod',
'TransitionSettings',
'PerTransitionSettings',
'DirectTransitionSettings',
'VideoPreset',
'VIDEO_PRESETS',
'BlendResult',
'TransitionSpec',
'LinkResult',
@@ -44,5 +53,10 @@ __all__ = [
'TransitionGenerator',
'RifeDownloader',
'PracticalRifeEnv',
'FilmEnv',
'SymlinkManager',
'OPTICAL_FLOW_PRESETS',
'encode_image_sequence',
'encode_from_file_list',
'find_ffmpeg',
]

View File

@@ -23,6 +23,8 @@ from .models import (
PerTransitionSettings,
BlendResult,
TransitionSpec,
DirectInterpolationMethod,
DirectTransitionSettings,
)
@@ -31,6 +33,14 @@ CACHE_DIR = Path.home() / '.cache' / 'video-montage-linker'
RIFE_GITHUB_API = 'https://api.github.com/repos/nihui/rife-ncnn-vulkan/releases/latest'
PRACTICAL_RIFE_VENV_DIR = CACHE_DIR / 'venv-rife'
# Optical flow presets
OPTICAL_FLOW_PRESETS = {
'fast': {'levels': 2, 'winsize': 11, 'iterations': 2, 'poly_n': 5, 'poly_sigma': 1.1},
'balanced': {'levels': 3, 'winsize': 15, 'iterations': 3, 'poly_n': 5, 'poly_sigma': 1.2},
'quality': {'levels': 5, 'winsize': 21, 'iterations': 5, 'poly_n': 7, 'poly_sigma': 1.5},
'max': {'levels': 7, 'winsize': 31, 'iterations': 10, 'poly_n': 7, 'poly_sigma': 1.5},
}
class PracticalRifeEnv:
"""Manages isolated Python environment for Practical-RIFE."""
@@ -243,6 +253,230 @@ class PracticalRifeEnv:
return False, str(e)
class FilmEnv:
"""Manages FILM frame interpolation using shared venv with RIFE."""
VENV_DIR = PRACTICAL_RIFE_VENV_DIR # Share venv with RIFE
MODEL_CACHE_DIR = CACHE_DIR / 'film'
MODEL_FILENAME = 'film_net_fp32.pt'
MODEL_URL = 'https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt'
# Keep REPO_DIR for backward compat (but unused now - model is downloaded directly)
REPO_DIR = CACHE_DIR / 'frame-interpolation-pytorch'
@classmethod
def get_venv_python(cls) -> Optional[Path]:
"""Get path to venv Python executable."""
if cls.VENV_DIR.exists():
if sys.platform == 'win32':
return cls.VENV_DIR / 'Scripts' / 'python.exe'
return cls.VENV_DIR / 'bin' / 'python'
return None
@classmethod
def get_model_path(cls) -> Path:
"""Get path to the FILM TorchScript model."""
return cls.MODEL_CACHE_DIR / cls.MODEL_FILENAME
@classmethod
def is_setup(cls) -> bool:
"""Check if venv exists and FILM model is downloaded."""
python = cls.get_venv_python()
if not python or not python.exists():
return False
# Check if model is downloaded
return cls.get_model_path().exists()
@classmethod
def setup_film(cls, progress_callback=None, cancelled_check=None) -> bool:
"""Download FILM model and ensure venv is ready.
Args:
progress_callback: Optional callback(message, percent) for progress.
cancelled_check: Optional callable that returns True if cancelled.
Returns:
True if setup was successful.
"""
python = cls.get_venv_python()
if not python or not python.exists():
# Need to set up base venv first via PracticalRifeEnv
return False
try:
model_path = cls.get_model_path()
if not model_path.exists():
if progress_callback:
progress_callback("Downloading FILM model (~380MB)...", 30)
if cancelled_check and cancelled_check():
return False
# Download the pre-trained TorchScript model
cls.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(cls.MODEL_URL, model_path)
if progress_callback:
progress_callback("FILM setup complete!", 100)
return cls.is_setup()
except Exception as e:
print(f"[FILM] Setup error: {e}", file=sys.stderr)
return False
@classmethod
def get_worker_script(cls) -> Path:
"""Get path to the FILM worker script."""
return Path(__file__).parent / 'film_worker.py'
@classmethod
def run_interpolation(
cls,
img_a_path: Path,
img_b_path: Path,
output_path: Path,
t: float
) -> tuple[bool, str]:
"""Run FILM interpolation via subprocess in venv.
Args:
img_a_path: Path to first input image.
img_b_path: Path to second input image.
output_path: Path to output image.
t: Timestep for interpolation (0.0 to 1.0).
Returns:
Tuple of (success, error_message).
"""
python = cls.get_venv_python()
if not python or not python.exists():
return False, "venv python not found"
script = cls.get_worker_script()
if not script.exists():
return False, f"worker script not found: {script}"
cmd = [
str(python), str(script),
'--input0', str(img_a_path),
'--input1', str(img_b_path),
'--output', str(output_path),
'--timestep', str(t),
'--repo-dir', str(cls.REPO_DIR),
'--model-dir', str(cls.MODEL_CACHE_DIR)
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180 # 3 minute timeout per frame (FILM is slower)
)
if result.returncode == 0 and output_path.exists():
return True, ""
else:
error = result.stderr.strip() if result.stderr else f"returncode={result.returncode}"
return False, error
except subprocess.TimeoutExpired:
return False, "timeout (180s)"
except Exception as e:
return False, str(e)
@classmethod
def run_batch_interpolation(
cls,
img_a_path: Path,
img_b_path: Path,
output_dir: Path,
frame_count: int,
output_pattern: str = 'frame_{:04d}.png'
) -> tuple[bool, str, list[Path]]:
"""Run FILM batch interpolation via subprocess in venv.
Generates all frames at once using FILM's recursive approach,
which produces better results than generating frames independently.
Args:
img_a_path: Path to first input image.
img_b_path: Path to second input image.
output_dir: Directory to save output frames.
frame_count: Number of frames to generate.
output_pattern: Filename pattern for output frames.
Returns:
Tuple of (success, error_message, list_of_output_paths).
"""
python = cls.get_venv_python()
if not python or not python.exists():
return False, "venv python not found", []
script = cls.get_worker_script()
if not script.exists():
return False, f"worker script not found: {script}", []
output_dir.mkdir(parents=True, exist_ok=True)
cmd = [
str(python), str(script),
'--input0', str(img_a_path),
'--input1', str(img_b_path),
'--output-dir', str(output_dir),
'--frame-count', str(frame_count),
'--output-pattern', output_pattern,
'--repo-dir', str(cls.REPO_DIR),
'--model-dir', str(cls.MODEL_CACHE_DIR)
]
try:
# Longer timeout for batch - scale with frame count
timeout = max(300, frame_count * 30) # At least 5 min, +30s per frame
print(f"[FILM] Running batch interpolation: {frame_count} frames", file=sys.stderr)
print(f"[FILM] Command: {' '.join(cmd)}", file=sys.stderr)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
# Collect output paths
output_paths = [
output_dir / output_pattern.format(i)
for i in range(frame_count)
]
existing_paths = [p for p in output_paths if p.exists()]
if result.returncode == 0 and len(existing_paths) == frame_count:
print(f"[FILM] Success: generated {len(existing_paths)} frames", file=sys.stderr)
return True, "", output_paths
else:
# Combine stdout and stderr for better error reporting
error_parts = []
if result.returncode != 0:
error_parts.append(f"returncode={result.returncode}")
if result.stdout and result.stdout.strip():
error_parts.append(f"stdout: {result.stdout.strip()}")
if result.stderr and result.stderr.strip():
error_parts.append(f"stderr: {result.stderr.strip()}")
if len(existing_paths) != frame_count:
error_parts.append(f"expected {frame_count} frames, got {len(existing_paths)}")
error = "; ".join(error_parts) if error_parts else "unknown error"
print(f"[FILM] Failed: {error}", file=sys.stderr)
return False, error, existing_paths
except subprocess.TimeoutExpired:
print(f"[FILM] Timeout after {timeout}s", file=sys.stderr)
return False, f"timeout ({timeout}s)", []
except Exception as e:
print(f"[FILM] Exception: {e}", file=sys.stderr)
return False, str(e), []
class RifeDownloader:
"""Handles automatic download and caching of rife-ncnn-vulkan binary."""
@@ -541,7 +775,16 @@ class ImageBlender:
return Image.blend(frames[lower_idx], frames[upper_idx], frac)
@staticmethod
def optical_flow_blend(img_a: Image.Image, img_b: Image.Image, t: float) -> Image.Image:
def optical_flow_blend(
img_a: Image.Image,
img_b: Image.Image,
t: float,
levels: int = 3,
winsize: int = 15,
iterations: int = 3,
poly_n: int = 5,
poly_sigma: float = 1.2
) -> Image.Image:
"""Blend using OpenCV optical flow for motion compensation.
Uses Farneback dense optical flow to warp frames and reduce ghosting
@@ -551,6 +794,11 @@ class ImageBlender:
img_a: First PIL Image (source frame).
img_b: Second PIL Image (target frame).
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
levels: Pyramid levels for optical flow (1-7).
winsize: Window size for optical flow (5-51, odd).
iterations: Number of iterations (1-10).
poly_n: Polynomial neighborhood size (5 or 7).
poly_sigma: Gaussian sigma for polynomial expansion (0.5-2.0).
Returns:
Motion-compensated blended PIL Image.
@@ -571,11 +819,11 @@ class ImageBlender:
flow = cv2.calcOpticalFlowFarneback(
gray_a, gray_b, None,
pyr_scale=0.5,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
levels=levels,
winsize=winsize,
iterations=iterations,
poly_n=poly_n,
poly_sigma=poly_sigma,
flags=0
)
@@ -802,6 +1050,56 @@ class ImageBlender:
# Fall back to ncnn RIFE or optical flow
return ImageBlender.rife_blend(img_a, img_b, t)
@staticmethod
def film_blend(
img_a: Image.Image,
img_b: Image.Image,
t: float
) -> Image.Image:
"""Blend using FILM for large motion interpolation.
FILM (Frame Interpolation for Large Motion) is Google Research's
high-quality frame interpolation model, better for large motion.
Args:
img_a: First PIL Image (source frame).
img_b: Second PIL Image (target frame).
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
Returns:
AI-interpolated blended PIL Image.
"""
if not FilmEnv.is_setup():
print("[FILM] Not set up, falling back to Practical-RIFE", file=sys.stderr)
return ImageBlender.practical_rife_blend(img_a, img_b, t)
try:
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir)
input_a = tmp / 'a.png'
input_b = tmp / 'b.png'
output_file = tmp / 'out.png'
# Save input images
img_a.convert('RGB').save(input_a)
img_b.convert('RGB').save(input_b)
# Run FILM via subprocess
success, error_msg = FilmEnv.run_interpolation(
input_a, input_b, output_file, t
)
if success and output_file.exists():
return Image.open(output_file).copy()
else:
print(f"[FILM] Interpolation failed: {error_msg}, falling back to Practical-RIFE", file=sys.stderr)
except Exception as e:
print(f"[FILM] Exception: {e}, falling back to Practical-RIFE", file=sys.stderr)
# Fall back to Practical-RIFE
return ImageBlender.practical_rife_blend(img_a, img_b, t)
@staticmethod
def blend_images(
img_a_path: Path,
@@ -817,7 +1115,12 @@ class ImageBlender:
rife_uhd: bool = False,
rife_tta: bool = False,
practical_rife_model: str = 'v4.25',
practical_rife_ensemble: bool = False
practical_rife_ensemble: bool = False,
of_levels: int = 3,
of_winsize: int = 15,
of_iterations: int = 3,
of_poly_n: int = 5,
of_poly_sigma: float = 1.2
) -> BlendResult:
"""Blend two images together.
@@ -836,6 +1139,11 @@ class ImageBlender:
rife_tta: Enable RIFE ncnn TTA mode.
practical_rife_model: Practical-RIFE model version (e.g., 'v4.25').
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
of_levels: Optical flow pyramid levels (1-7).
of_winsize: Optical flow window size (5-51, odd).
of_iterations: Optical flow iterations (1-10).
of_poly_n: Optical flow polynomial neighborhood (5 or 7).
of_poly_sigma: Optical flow gaussian sigma (0.5-2.0).
Returns:
BlendResult with operation status.
@@ -856,7 +1164,14 @@ class ImageBlender:
# Blend images using selected method
if blend_method == BlendMethod.OPTICAL_FLOW:
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
blended = ImageBlender.optical_flow_blend(
img_a, img_b, factor,
levels=of_levels,
winsize=of_winsize,
iterations=of_iterations,
poly_n=of_poly_n,
poly_sigma=of_poly_sigma
)
elif blend_method == BlendMethod.RIFE:
blended = ImageBlender.rife_blend(
img_a, img_b, factor, rife_binary_path, True, rife_model, rife_uhd, rife_tta
@@ -922,7 +1237,12 @@ class ImageBlender:
rife_uhd: bool = False,
rife_tta: bool = False,
practical_rife_model: str = 'v4.25',
practical_rife_ensemble: bool = False
practical_rife_ensemble: bool = False,
of_levels: int = 3,
of_winsize: int = 15,
of_iterations: int = 3,
of_poly_n: int = 5,
of_poly_sigma: float = 1.2
) -> BlendResult:
"""Blend two PIL Image objects together.
@@ -941,6 +1261,11 @@ class ImageBlender:
rife_tta: Enable RIFE ncnn TTA mode.
practical_rife_model: Practical-RIFE model version (e.g., 'v4.25').
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
of_levels: Optical flow pyramid levels (1-7).
of_winsize: Optical flow window size (5-51, odd).
of_iterations: Optical flow iterations (1-10).
of_poly_n: Optical flow polynomial neighborhood (5 or 7).
of_poly_sigma: Optical flow gaussian sigma (0.5-2.0).
Returns:
BlendResult with operation status.
@@ -958,7 +1283,14 @@ class ImageBlender:
# Blend images using selected method
if blend_method == BlendMethod.OPTICAL_FLOW:
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
blended = ImageBlender.optical_flow_blend(
img_a, img_b, factor,
levels=of_levels,
winsize=of_winsize,
iterations=of_iterations,
poly_n=of_poly_n,
poly_sigma=of_poly_sigma
)
elif blend_method == BlendMethod.RIFE:
blended = ImageBlender.rife_blend(
img_a, img_b, factor, rife_binary_path, True, rife_model, rife_uhd, rife_tta
@@ -1024,21 +1356,19 @@ class TransitionGenerator:
def get_folder_type(
self,
index: int,
overrides: Optional[dict[Path, FolderType]] = None,
folder: Optional[Path] = None
overrides: Optional[dict[int, FolderType]] = None,
) -> FolderType:
"""Determine folder type based on position or override.
Args:
index: 0-based position of folder in list.
overrides: Optional dict of folder path to FolderType overrides.
folder: The folder path for checking overrides.
overrides: Optional dict of position index to FolderType overrides.
Returns:
FolderType.MAIN for odd positions (1, 3, 5...), TRANSITION for even.
FolderType.MAIN for even positions (0, 2, 4...), TRANSITION for odd.
"""
if overrides and folder and folder in overrides:
override = overrides[folder]
if overrides and index in overrides:
override = overrides[index]
if override != FolderType.AUTO:
return override
@@ -1048,9 +1378,9 @@ class TransitionGenerator:
def identify_transition_boundaries(
self,
folders: list[Path],
files_by_folder: dict[Path, list[str]],
folder_overrides: Optional[dict[Path, FolderType]] = None,
per_transition_settings: Optional[dict[Path, PerTransitionSettings]] = None
files_by_idx: dict[int, list[str]],
folder_overrides: Optional[dict[int, FolderType]] = None,
per_transition_settings: Optional[dict[int, PerTransitionSettings]] = None
) -> list[TransitionSpec]:
"""Identify boundaries where transitions should occur.
@@ -1059,9 +1389,9 @@ class TransitionGenerator:
Args:
folders: List of folders in order.
files_by_folder: Dict mapping folders to their file lists.
folder_overrides: Optional folder type overrides.
per_transition_settings: Optional per-transition overlap settings.
files_by_idx: Dict mapping position index to file lists.
folder_overrides: Optional position-index-keyed folder type overrides.
per_transition_settings: Optional position-index-keyed per-transition overlap settings.
Returns:
List of TransitionSpec objects describing each transition.
@@ -1071,47 +1401,67 @@ class TransitionGenerator:
transitions = []
cumulative_idx = 0
folder_start_indices = {}
folder_start_indices: dict[int, int] = {}
# Calculate start indices for each folder
for folder in folders:
folder_start_indices[folder] = cumulative_idx
cumulative_idx += len(files_by_folder.get(folder, []))
# Calculate start indices for each folder position
for i in range(len(folders)):
folder_start_indices[i] = cumulative_idx
cumulative_idx += len(files_by_idx.get(i, []))
# Track how many files are committed from each folder's start and end
# so overlaps never exceed available frames.
committed_from_start: dict[int, int] = {} # folder idx → frames used from start
committed_from_end: dict[int, int] = {} # folder idx → frames used from end
# Look for transition boundaries (MAIN->TRANSITION and TRANSITION->MAIN)
for i in range(len(folders) - 1):
folder_a = folders[i]
folder_b = folders[i + 1]
type_a = self.get_folder_type(i, folder_overrides, folder_a)
type_b = self.get_folder_type(i + 1, folder_overrides, folder_b)
type_a = self.get_folder_type(i, folder_overrides)
type_b = self.get_folder_type(i + 1, folder_overrides)
# Create transition when types differ (MAIN->TRANS or TRANS->MAIN)
if type_a != type_b:
files_a = files_by_folder.get(folder_a, [])
files_b = files_by_folder.get(folder_b, [])
files_a = files_by_idx.get(i, [])
files_b = files_by_idx.get(i + 1, [])
if not files_a or not files_b:
continue
# Get per-transition overlap settings if available
# Use folder_b as the key (the "incoming" folder)
if per_transition_settings and folder_b in per_transition_settings:
pts = per_transition_settings[folder_b]
left_overlap = pts.left_overlap
right_overlap = pts.right_overlap
# Get per-transition overlap settings from the TRANSITION folder
# (could be at position i or i+1 depending on boundary direction)
pts_key = i if type_a == FolderType.TRANSITION else i + 1
if per_transition_settings and pts_key in per_transition_settings:
pts = per_transition_settings[pts_key]
if type_a == FolderType.TRANSITION:
# TRANS→MAIN boundary: use right_overlap (right boundary count)
left_overlap = pts.right_overlap
right_overlap = pts.right_overlap
else:
# MAIN→TRANS boundary: use left_overlap (left boundary count)
left_overlap = pts.left_overlap
right_overlap = pts.left_overlap
else:
# Use default of 16 for both
left_overlap = 16
right_overlap = 16
# Cap overlaps by available files
left_overlap = min(left_overlap, len(files_a))
right_overlap = min(right_overlap, len(files_b))
# Cap overlaps by available files, accounting for frames
# already committed to a prior boundary on the same folder.
# Keep both sides equal (symmetric) after capping.
avail_a = len(files_a) - committed_from_start.get(i, 0)
avail_b = len(files_b) - committed_from_end.get(i + 1, 0)
capped = min(left_overlap, right_overlap, avail_a, avail_b)
left_overlap = capped
right_overlap = capped
if left_overlap < 1 or right_overlap < 1:
continue
committed_from_end[i] = committed_from_end.get(i, 0) + left_overlap
committed_from_start[i + 1] = committed_from_start.get(i + 1, 0) + right_overlap
transitions.append(TransitionSpec(
main_folder=folder_a,
trans_folder=folder_b,
@@ -1119,8 +1469,10 @@ class TransitionGenerator:
trans_files=files_b,
left_overlap=left_overlap,
right_overlap=right_overlap,
main_start_idx=folder_start_indices[folder_a],
trans_start_idx=folder_start_indices[folder_b]
main_start_idx=folder_start_indices[i],
trans_start_idx=folder_start_indices[i + 1],
main_folder_idx=i,
trans_folder_idx=i + 1,
))
return transitions
@@ -1130,7 +1482,7 @@ class TransitionGenerator:
spec: TransitionSpec,
dest: Path,
folder_idx_main: int,
base_file_idx: int
base_seq_num: int
) -> list[BlendResult]:
"""Generate blended frames for an asymmetric transition.
@@ -1141,8 +1493,8 @@ class TransitionGenerator:
Args:
spec: TransitionSpec describing the transition.
dest: Destination directory for blended frames.
folder_idx_main: Folder index for sequence naming.
base_file_idx: Starting file index for sequence naming.
folder_idx_main: Folder index (unused, kept for compatibility).
base_seq_num: Starting sequence number for continuous naming.
Returns:
List of BlendResult objects.
@@ -1197,8 +1549,8 @@ class TransitionGenerator:
# Generate output filename
ext = f".{self.settings.output_format.lower()}"
file_idx = base_file_idx + i
output_name = f"seq{folder_idx_main + 1:02d}_{file_idx:04d}{ext}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
result = self.blender.blend_images_pil(
@@ -1215,7 +1567,12 @@ class TransitionGenerator:
self.settings.rife_uhd,
self.settings.rife_tta,
self.settings.practical_rife_model,
self.settings.practical_rife_ensemble
self.settings.practical_rife_ensemble,
self.settings.of_levels,
self.settings.of_winsize,
self.settings.of_iterations,
self.settings.of_poly_n,
self.settings.of_poly_sigma
)
results.append(result)
@@ -1232,7 +1589,7 @@ class TransitionGenerator:
spec: TransitionSpec,
dest: Path,
folder_idx_main: int,
base_file_idx: int
base_seq_num: int
) -> list[BlendResult]:
"""Generate blended frames for a transition.
@@ -1241,13 +1598,249 @@ class TransitionGenerator:
Args:
spec: TransitionSpec describing the transition.
dest: Destination directory for blended frames.
folder_idx_main: Folder index for sequence naming.
base_file_idx: Starting file index for sequence naming.
folder_idx_main: Folder index (unused, kept for compatibility).
base_seq_num: Starting sequence number for continuous naming.
Returns:
List of BlendResult objects.
"""
# Use asymmetric blend for all cases (handles symmetric too)
return self.generate_asymmetric_blend_frames(
spec, dest, folder_idx_main, base_file_idx
spec, dest, folder_idx_main, base_seq_num
)
def generate_direct_interpolation_frames(
self,
img_a_path: Path,
img_b_path: Path,
frame_count: int,
method: DirectInterpolationMethod,
dest: Path,
folder_idx: int,
base_seq_num: int,
practical_rife_model: str = 'v4.25',
practical_rife_ensemble: bool = False
) -> list[BlendResult]:
"""Generate AI-interpolated frames between two images.
Used for direct transitions between MAIN sequences without
a transition folder.
For FILM: Uses batch mode to generate all frames at once (better quality).
For RIFE: Generates frames one at a time (RIFE handles arbitrary timesteps well).
Args:
img_a_path: Path to last frame of first sequence.
img_b_path: Path to first frame of second sequence.
frame_count: Number of interpolated frames to generate.
method: Interpolation method (RIFE or FILM).
dest: Destination directory for generated frames.
folder_idx: Folder index (unused, kept for compatibility).
base_seq_num: Starting sequence number for continuous naming.
practical_rife_model: Practical-RIFE model version.
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
Returns:
List of BlendResult objects.
"""
results = []
dest.mkdir(parents=True, exist_ok=True)
# For FILM, use batch mode to generate all frames at once
if method == DirectInterpolationMethod.FILM and FilmEnv.is_setup():
return self._generate_film_frames_batch(
img_a_path, img_b_path, frame_count, dest, base_seq_num
)
# For RIFE (or FILM fallback), generate frames one at a time
# Load source images
img_a = Image.open(img_a_path)
img_b = Image.open(img_b_path)
# Handle different sizes - resize B to match A
if img_a.size != img_b.size:
img_b = img_b.resize(img_a.size, Image.Resampling.LANCZOS)
# Normalize to RGBA
if img_a.mode != 'RGBA':
img_a = img_a.convert('RGBA')
if img_b.mode != 'RGBA':
img_b = img_b.convert('RGBA')
for i in range(frame_count):
# Evenly space t values between 0 and 1 (exclusive)
t = (i + 1) / (frame_count + 1)
# Generate interpolated frame
if method == DirectInterpolationMethod.FILM:
blended = ImageBlender.film_blend(img_a, img_b, t)
else: # RIFE
blended = ImageBlender.practical_rife_blend(
img_a, img_b, t,
practical_rife_model, practical_rife_ensemble
)
# Generate output filename
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
# Save the blended frame
try:
# Convert back to RGB if saving to JPEG
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
blended = blended.convert('RGB')
# Save with appropriate options
save_kwargs = {}
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
save_kwargs['quality'] = self.settings.output_quality
elif self.settings.output_format.lower() == 'webp':
save_kwargs['lossless'] = True
save_kwargs['method'] = self.settings.webp_method
elif self.settings.output_format.lower() == 'png':
save_kwargs['compress_level'] = 6
blended.save(output_path, **save_kwargs)
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=True
))
except Exception as e:
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=str(e)
))
# Close loaded images
img_a.close()
img_b.close()
return results
def _generate_film_frames_batch(
self,
img_a_path: Path,
img_b_path: Path,
frame_count: int,
dest: Path,
base_seq_num: int
) -> list[BlendResult]:
"""Generate FILM frames using batch mode for better quality.
FILM works best when generating all frames at once using its
recursive approach, rather than generating arbitrary timesteps.
Args:
img_a_path: Path to last frame of first sequence.
img_b_path: Path to first frame of second sequence.
frame_count: Number of interpolated frames to generate.
dest: Destination directory for generated frames.
base_seq_num: Starting sequence number for continuous naming.
Returns:
List of BlendResult objects.
"""
results = []
# Generate frames using FILM batch mode
# Use a temp pattern, then rename to final names
temp_pattern = 'film_temp_{:04d}.png'
success, error, temp_paths = FilmEnv.run_batch_interpolation(
img_a_path,
img_b_path,
dest,
frame_count,
temp_pattern
)
if not success:
# Return error results for all frames
for i in range(frame_count):
t = (i + 1) / (frame_count + 1)
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=error
))
return results
# Rename temp files to final names and convert format if needed
for i, temp_path in enumerate(temp_paths):
t = (i + 1) / (frame_count + 1)
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
try:
if temp_path.exists():
# Load the temp frame
frame = Image.open(temp_path)
# Convert format if needed
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
frame = frame.convert('RGB')
# Save with appropriate options
save_kwargs = {}
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
save_kwargs['quality'] = self.settings.output_quality
elif self.settings.output_format.lower() == 'webp':
save_kwargs['lossless'] = True
save_kwargs['method'] = self.settings.webp_method
elif self.settings.output_format.lower() == 'png':
save_kwargs['compress_level'] = 6
frame.save(output_path, **save_kwargs)
frame.close()
# Remove temp file if different from output
if temp_path != output_path:
temp_path.unlink(missing_ok=True)
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=True
))
else:
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=f"Temp file not found: {temp_path}"
))
except Exception as e:
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=str(e)
))
return results

View File

@@ -39,7 +39,8 @@ class DatabaseManager:
CREATE TABLE IF NOT EXISTS symlink_sessions (
id INTEGER PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
destination TEXT NOT NULL
destination TEXT NOT NULL,
name TEXT DEFAULT NULL
);
CREATE TABLE IF NOT EXISTS symlinks (
@@ -82,6 +83,24 @@ class DatabaseManager:
right_overlap INTEGER DEFAULT 16,
UNIQUE(session_id, trans_folder)
);
CREATE TABLE IF NOT EXISTS removed_files (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
filename TEXT NOT NULL,
UNIQUE(session_id, source_folder, filename)
);
CREATE TABLE IF NOT EXISTS direct_transition_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
after_folder TEXT NOT NULL,
frame_count INTEGER DEFAULT 16,
method TEXT DEFAULT 'film',
enabled INTEGER DEFAULT 1,
UNIQUE(session_id, after_folder)
);
""")
# Migration: add folder_type column if it doesn't exist
@@ -114,20 +133,168 @@ class DatabaseManager:
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN rife_binary_path TEXT")
# Migration: add folder_order column if it doesn't exist
try:
conn.execute("SELECT folder_order FROM sequence_trim_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE sequence_trim_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
# Migration: add name column to symlink_sessions if it doesn't exist
try:
conn.execute("SELECT name FROM symlink_sessions LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN name TEXT DEFAULT NULL")
# Migration: widen UNIQUE constraints to allow duplicate folder paths per session.
# sequence_trim_settings: UNIQUE(session_id, source_folder) → UNIQUE(session_id, folder_order)
self._migrate_unique_constraint(
conn, 'sequence_trim_settings',
"""CREATE TABLE sequence_trim_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
trim_start INTEGER DEFAULT 0,
trim_end INTEGER DEFAULT 0,
folder_type TEXT DEFAULT 'auto',
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, folder_order)
)""",
'session_id, source_folder, trim_start, trim_end, folder_type, folder_order',
)
# per_transition_settings: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM per_transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE per_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'per_transition_settings',
"""CREATE TABLE per_transition_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
trans_folder TEXT NOT NULL,
left_overlap INTEGER DEFAULT 16,
right_overlap INTEGER DEFAULT 16,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, trans_folder, folder_order)
)""",
'session_id, trans_folder, left_overlap, right_overlap, folder_order',
)
# removed_files: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM removed_files LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE removed_files ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'removed_files',
"""CREATE TABLE removed_files_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
filename TEXT NOT NULL,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, source_folder, filename, folder_order)
)""",
'session_id, source_folder, filename, folder_order',
)
# direct_transition_settings: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM direct_transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE direct_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'direct_transition_settings',
"""CREATE TABLE direct_transition_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
after_folder TEXT NOT NULL,
frame_count INTEGER DEFAULT 16,
method TEXT DEFAULT 'film',
enabled INTEGER DEFAULT 1,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, after_folder, folder_order)
)""",
'session_id, after_folder, frame_count, method, enabled, folder_order',
)
# Migration: add locked column to symlink_sessions
try:
conn.execute("SELECT locked FROM symlink_sessions LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN locked INTEGER DEFAULT 0")
# Migration: remove overlap_frames from transition_settings (now per-transition)
# We'll keep it for backward compatibility but won't use it
@staticmethod
def _migrate_unique_constraint(
conn: sqlite3.Connection,
table: str,
create_new_sql: str,
columns: str,
) -> None:
"""Recreate a table with a new UNIQUE constraint if needed.
Tests whether duplicate folder_order=0 entries can be inserted.
If an IntegrityError fires, the old constraint is too narrow and
the table must be recreated.
"""
new_table = f"{table}_new"
try:
# Test: can we insert two rows with same session+folder but different folder_order?
# If the old UNIQUE is still (session_id, source_folder) this will fail.
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 1)")
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 2)")
# Clean up test rows
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
# If we got here, the constraint already allows duplicates — no migration needed
return
except sqlite3.IntegrityError:
# Old constraint is too narrow — need to recreate
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
except sqlite3.OperationalError:
# Column might not exist yet or other issue — try migration anyway
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
try:
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
conn.execute(create_new_sql)
conn.execute(f"INSERT INTO {new_table} ({columns}) SELECT {columns} FROM {table}")
conn.execute(f"DROP TABLE {table}")
conn.execute(f"ALTER TABLE {new_table} RENAME TO {table}")
except (sqlite3.OperationalError, sqlite3.IntegrityError):
# Clean up failed migration attempt
try:
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
except sqlite3.OperationalError:
pass
def clear_session_data(self, session_id: int) -> None:
"""Delete all data for a session (symlinks, settings, etc.) but keep the session row."""
try:
with self._connect() as conn:
for table in (
'symlinks', 'sequence_trim_settings', 'transition_settings',
'per_transition_settings', 'removed_files', 'direct_transition_settings',
):
conn.execute(f"DELETE FROM {table} WHERE session_id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to clear session data: {e}") from e
def _connect(self) -> sqlite3.Connection:
"""Create a database connection with foreign keys enabled."""
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def create_session(self, destination: str) -> int:
def create_session(self, destination: str, name: Optional[str] = None) -> int:
"""Create a new linking session.
Args:
destination: The destination directory path.
name: Optional display name (e.g. "autosave").
Returns:
The ID of the created session.
@@ -138,8 +305,8 @@ class DatabaseManager:
try:
with self._connect() as conn:
cursor = conn.execute(
"INSERT INTO symlink_sessions (destination) VALUES (?)",
(destination,)
"INSERT INTO symlink_sessions (destination, name) VALUES (?, ?)",
(destination, name)
)
return cursor.lastrowid
except sqlite3.Error as e:
@@ -180,6 +347,31 @@ class DatabaseManager:
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlink: {e}") from e
def record_symlinks_batch(
self,
session_id: int,
records: list[tuple[str, str, str, int]],
) -> None:
"""Record multiple symlinks in a single transaction.
Args:
session_id: The session these symlinks belong to.
records: List of (source, link, filename, seq) tuples.
Raises:
DatabaseError: If recording fails.
"""
try:
with self._connect() as conn:
conn.executemany(
"""INSERT INTO symlinks
(session_id, source_path, link_path, original_filename, sequence_number)
VALUES (?, ?, ?, ?, ?)""",
[(session_id, src, lnk, fname, seq) for src, lnk, fname, seq in records]
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlinks: {e}") from e
def get_sessions(self) -> list[SessionRecord]:
"""List all sessions with link counts.
@@ -188,7 +380,8 @@ class DatabaseManager:
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
s.name, COALESCE(s.locked, 0)
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
GROUP BY s.id
@@ -200,7 +393,9 @@ class DatabaseManager:
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3]
link_count=row[3],
name=row[4],
locked=bool(row[5])
)
for row in rows
]
@@ -270,7 +465,7 @@ class DatabaseManager:
]
def delete_session(self, session_id: int) -> None:
"""Delete a session and all its symlink records.
"""Delete a session and all its related data (CASCADE handles child tables).
Args:
session_id: The session ID to delete.
@@ -280,11 +475,56 @@ class DatabaseManager:
"""
try:
with self._connect() as conn:
conn.execute("DELETE FROM symlinks WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM symlink_sessions WHERE id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete session: {e}") from e
def delete_sessions(self, session_ids: list[int]) -> None:
"""Delete multiple sessions in a single transaction.
Locked sessions are silently skipped.
Args:
session_ids: List of session IDs to delete.
Raises:
DatabaseError: If deletion fails.
"""
if not session_ids:
return
try:
with self._connect() as conn:
placeholders = ','.join('?' for _ in session_ids)
conn.execute(
f"DELETE FROM symlink_sessions WHERE id IN ({placeholders}) AND COALESCE(locked, 0) = 0",
session_ids
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete sessions: {e}") from e
def toggle_session_locked(self, session_id: int) -> bool:
"""Toggle the locked state of a session.
Returns:
The new locked state.
"""
try:
with self._connect() as conn:
row = conn.execute(
"SELECT COALESCE(locked, 0) FROM symlink_sessions WHERE id = ?",
(session_id,)
).fetchone()
if row is None:
raise DatabaseError(f"Session {session_id} not found")
new_val = 0 if row[0] else 1
conn.execute(
"UPDATE symlink_sessions SET locked = ? WHERE id = ?",
(new_val, session_id)
)
return bool(new_val)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to toggle session lock: {e}") from e
def get_sessions_by_destination(self, dest: str) -> list[SessionRecord]:
"""Get all sessions for a destination directory.
@@ -296,7 +536,8 @@ class DatabaseManager:
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
s.name, COALESCE(s.locked, 0)
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
WHERE s.destination = ?
@@ -309,7 +550,9 @@ class DatabaseManager:
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3]
link_count=row[3],
name=row[4],
locked=bool(row[5])
)
for row in rows
]
@@ -320,7 +563,8 @@ class DatabaseManager:
source_folder: str,
trim_start: int,
trim_end: int,
folder_type: FolderType = FolderType.AUTO
folder_type: FolderType = FolderType.AUTO,
folder_order: int = 0,
) -> None:
"""Save trim settings for a folder in a session.
@@ -330,6 +574,7 @@ class DatabaseManager:
trim_start: Number of images to trim from start.
trim_end: Number of images to trim from end.
folder_type: The folder type (auto, main, or transition).
folder_order: Position of this folder in source_folders list.
Raises:
DatabaseError: If saving fails.
@@ -338,13 +583,14 @@ class DatabaseManager:
with self._connect() as conn:
conn.execute(
"""INSERT INTO sequence_trim_settings
(session_id, source_folder, trim_start, trim_end, folder_type)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, source_folder)
DO UPDATE SET trim_start=excluded.trim_start,
(session_id, source_folder, trim_start, trim_end, folder_type, folder_order)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, folder_order)
DO UPDATE SET source_folder=excluded.source_folder,
trim_start=excluded.trim_start,
trim_end=excluded.trim_end,
folder_type=excluded.folder_type""",
(session_id, source_folder, trim_start, trim_end, folder_type.value)
(session_id, source_folder, trim_start, trim_end, folder_type.value, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save trim settings: {e}") from e
@@ -396,6 +642,62 @@ class DatabaseManager:
return {row[0]: (row[1], row[2]) for row in rows}
def get_all_folder_settings(self, session_id: int) -> dict[str, tuple[int, int, FolderType]]:
"""Get all folder settings (trim + type) for a session, unordered.
Returns:
Dict mapping source_folder to (trim_start, trim_end, folder_type).
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, trim_start, trim_end, folder_type
FROM sequence_trim_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
result = {}
for row in rows:
try:
ft = FolderType(row[3]) if row[3] else FolderType.AUTO
except ValueError:
ft = FolderType.AUTO
result[row[0]] = (row[1], row[2], ft)
return result
def get_ordered_folders(self, session_id: int) -> list[tuple[str, FolderType, int, int]]:
"""Get all folders for a session in saved order.
Returns:
List of (source_folder, folder_type, trim_start, trim_end) sorted by folder_order.
Returns empty list if folder_order is not meaningful (all zeros from
pre-migration sessions), so the caller falls back to symlink-derived order.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, folder_type, trim_start, trim_end, folder_order
FROM sequence_trim_settings WHERE session_id = ?
ORDER BY folder_order""",
(session_id,)
).fetchall()
if not rows:
return []
# If all folder_order values are 0, this is a pre-migration session
# where the ordering is not meaningful — return empty to trigger
# the legacy symlink-derived ordering path.
if len(rows) > 1 and all(row[4] == 0 for row in rows):
return []
result = []
for row in rows:
try:
ft = FolderType(row[1]) if row[1] else FolderType.AUTO
except ValueError:
ft = FolderType.AUTO
result.append((row[0], ft, row[2], row[3]))
return result
def save_transition_settings(
self,
session_id: int,
@@ -532,13 +834,15 @@ class DatabaseManager:
def save_per_transition_settings(
self,
session_id: int,
settings: PerTransitionSettings
settings: PerTransitionSettings,
folder_order: int = 0,
) -> None:
"""Save per-transition overlap settings.
Args:
session_id: The session ID.
settings: PerTransitionSettings to save.
folder_order: Position of this folder in the source list.
Raises:
DatabaseError: If saving fails.
@@ -547,13 +851,13 @@ class DatabaseManager:
with self._connect() as conn:
conn.execute(
"""INSERT INTO per_transition_settings
(session_id, trans_folder, left_overlap, right_overlap)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder)
(session_id, trans_folder, left_overlap, right_overlap, folder_order)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder, folder_order)
DO UPDATE SET left_overlap=excluded.left_overlap,
right_overlap=excluded.right_overlap""",
(session_id, str(settings.trans_folder),
settings.left_overlap, settings.right_overlap)
settings.left_overlap, settings.right_overlap, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save per-transition settings: {e}") from e
@@ -590,27 +894,110 @@ class DatabaseManager:
def get_all_per_transition_settings(
self,
session_id: int
) -> dict[str, PerTransitionSettings]:
) -> list[tuple[str, int, int, int]]:
"""Get all per-transition settings for a session.
Args:
session_id: The session ID.
Returns:
Dict mapping transition folder paths to PerTransitionSettings.
List of (trans_folder, left_overlap, right_overlap, folder_order) tuples.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT trans_folder, left_overlap, right_overlap
FROM per_transition_settings WHERE session_id = ?""",
"""SELECT trans_folder, left_overlap, right_overlap, folder_order
FROM per_transition_settings WHERE session_id = ?
ORDER BY folder_order""",
(session_id,)
).fetchall()
return {
row[0]: PerTransitionSettings(
trans_folder=Path(row[0]),
left_overlap=row[1],
right_overlap=row[2]
)
for row in rows
}
return [(row[0], row[1], row[2], row[3]) for row in rows]
def save_removed_files(
self,
session_id: int,
source_folder: str,
filenames: list[str],
folder_order: int = 0,
) -> None:
"""Save removed files for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
filenames: List of removed filenames.
folder_order: Position of this folder in the source list.
"""
try:
with self._connect() as conn:
for filename in filenames:
conn.execute(
"""INSERT OR IGNORE INTO removed_files
(session_id, source_folder, filename, folder_order)
VALUES (?, ?, ?, ?)""",
(session_id, source_folder, filename, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save removed files: {e}") from e
def get_removed_files(self, session_id: int) -> dict[int, set[str]]:
"""Get all removed files for a session, keyed by folder_order.
Args:
session_id: The session ID.
Returns:
Dict mapping folder_order to sets of removed filenames.
"""
with self._connect() as conn:
rows = conn.execute(
"SELECT source_folder, filename, folder_order FROM removed_files WHERE session_id = ?",
(session_id,)
).fetchall()
result: dict[int, set[str]] = {}
for folder, filename, folder_order in rows:
if folder_order not in result:
result[folder_order] = set()
result[folder_order].add(filename)
return result
def save_direct_transition(
self,
session_id: int,
after_folder: str,
frame_count: int,
method: str,
enabled: bool,
folder_order: int = 0,
) -> None:
"""Save direct interpolation settings for a folder transition."""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO direct_transition_settings
(session_id, after_folder, frame_count, method, enabled, folder_order)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, folder_order)
DO UPDATE SET after_folder=excluded.after_folder,
frame_count=excluded.frame_count,
method=excluded.method,
enabled=excluded.enabled""",
(session_id, after_folder, frame_count, method, 1 if enabled else 0, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save direct transition: {e}") from e
def get_direct_transitions(self, session_id: int) -> list[tuple[str, int, str, bool, int]]:
"""Get direct interpolation settings for a session.
Returns:
List of (after_folder, frame_count, method, enabled, folder_order) tuples.
"""
with self._connect() as conn:
rows = conn.execute(
"SELECT after_folder, frame_count, method, enabled, folder_order "
"FROM direct_transition_settings WHERE session_id = ?",
(session_id,)
).fetchall()
return [(r[0], r[1], r[2], bool(r[3]), r[4]) for r in rows]

285
core/film_worker.py Normal file
View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python
"""FILM interpolation worker - runs in isolated venv with PyTorch.
This script is executed via subprocess from the main application.
It handles frame interpolation using Google Research's FILM model
(Frame Interpolation for Large Motion) via the frame-interpolation-pytorch repo.
FILM is better than RIFE for large motion and scene gaps, but slower.
Supports two modes:
1. Single frame: --output with --timestep
2. Batch mode: --output-dir with --frame-count (generates all frames at once)
"""
import argparse
import sys
import urllib.request
from pathlib import Path
import numpy as np
import torch
from PIL import Image
# Model download URL
FILM_MODEL_URL = "https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt"
FILM_MODEL_FILENAME = "film_net_fp32.pt"
def load_image(path: Path, device: torch.device) -> torch.Tensor:
"""Load image as tensor.
Args:
path: Path to image file.
device: Device to load tensor to.
Returns:
Image tensor (1, 3, H, W) normalized to [0, 1].
"""
img = Image.open(path).convert('RGB')
arr = np.array(img).astype(np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
return tensor.to(device)
def save_image(tensor: torch.Tensor, path: Path) -> None:
"""Save tensor as image.
Args:
tensor: Image tensor (1, 3, H, W) or (3, H, W) normalized to [0, 1].
path: Output path.
"""
if tensor.dim() == 4:
tensor = tensor.squeeze(0)
arr = tensor.permute(1, 2, 0).cpu().numpy()
arr = (arr * 255).clip(0, 255).astype(np.uint8)
Image.fromarray(arr).save(path)
# Global model cache
_model_cache: dict = {}
def download_model(model_dir: Path) -> Path:
"""Download FILM model if not present.
Args:
model_dir: Directory to store the model.
Returns:
Path to the downloaded model file.
"""
model_dir.mkdir(parents=True, exist_ok=True)
model_path = model_dir / FILM_MODEL_FILENAME
if not model_path.exists():
print(f"Downloading FILM model to {model_path}...", file=sys.stderr)
urllib.request.urlretrieve(FILM_MODEL_URL, model_path)
print("Download complete.", file=sys.stderr)
return model_path
def get_model(model_dir: Path, device: torch.device):
"""Get or load FILM model (cached).
Args:
model_dir: Model cache directory (for model downloads).
device: Device to run on.
Returns:
FILM TorchScript model instance.
"""
cache_key = f"film_{device}"
if cache_key not in _model_cache:
# Download model if needed
model_path = download_model(model_dir)
# Load pre-trained TorchScript model
print(f"Loading FILM model from {model_path}...", file=sys.stderr)
model = torch.jit.load(str(model_path), map_location='cpu')
model.eval()
model.to(device)
_model_cache[cache_key] = model
print("Model loaded.", file=sys.stderr)
return _model_cache[cache_key]
@torch.no_grad()
def interpolate_single(model, img0: torch.Tensor, img1: torch.Tensor, t: float) -> torch.Tensor:
"""Perform single frame interpolation using FILM.
Args:
model: FILM TorchScript model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
t: Interpolation timestep (0.0 to 1.0).
Returns:
Interpolated frame tensor.
"""
# FILM TorchScript model expects dt as tensor of shape (1, 1)
dt = img0.new_full((1, 1), t)
result = model(img0, img1, dt)
if isinstance(result, tuple):
result = result[0]
return result.clamp(0, 1)
@torch.no_grad()
def interpolate_batch(model, img0: torch.Tensor, img1: torch.Tensor, frame_count: int) -> list[torch.Tensor]:
"""Generate multiple interpolated frames using FILM's recursive approach.
FILM works best when generating frames recursively - it first generates
the middle frame, then fills in the gaps. This produces more consistent
results than generating arbitrary timesteps independently.
Args:
model: FILM model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
frame_count: Number of frames to generate between img0 and img1.
Returns:
List of interpolated frame tensors in order.
"""
# Calculate timesteps for evenly spaced frames
timesteps = [(i + 1) / (frame_count + 1) for i in range(frame_count)]
# Try to use the model's batch/recursive interpolation if available
try:
# Some implementations have an interpolate_recursively method
if hasattr(model, 'interpolate_recursively'):
# This generates 2^n - 1 frames, so we need to handle arbitrary counts
results = model.interpolate_recursively(img0, img1, frame_count)
if len(results) >= frame_count:
return results[:frame_count]
except (AttributeError, TypeError):
pass
# Fall back to recursive binary interpolation for better quality
# This mimics FILM's natural recursive approach
frames = {} # timestep -> tensor
def recursive_interpolate(t_left: float, t_right: float, img_left: torch.Tensor, img_right: torch.Tensor, depth: int = 0):
"""Recursively interpolate to fill the gap."""
if depth > 10: # Prevent infinite recursion
return
t_mid = (t_left + t_right) / 2
# Check if we need a frame near t_mid
need_frame = False
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
need_frame = True
break
if not need_frame:
# Check if any remaining timesteps are in this range
remaining = [t for t in timesteps if t not in frames and t_left < t < t_right]
if not remaining:
return
# Generate middle frame
mid_frame = interpolate_single(model, img_left, img_right, 0.5)
# Assign to nearest needed timestep
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
frames[t] = mid_frame
break
# Recurse into left and right halves
recursive_interpolate(t_left, t_mid, img_left, mid_frame, depth + 1)
recursive_interpolate(t_mid, t_right, mid_frame, img_right, depth + 1)
# Start recursive interpolation
recursive_interpolate(0.0, 1.0, img0, img1)
# Fill any remaining timesteps with direct interpolation
for t in timesteps:
if t not in frames:
frames[t] = interpolate_single(model, img0, img1, t)
# Return frames in order
return [frames[t] for t in timesteps]
def main():
parser = argparse.ArgumentParser(description='FILM frame interpolation worker')
parser.add_argument('--input0', required=True, help='Path to first input image')
parser.add_argument('--input1', required=True, help='Path to second input image')
parser.add_argument('--output', help='Path to output image (single frame mode)')
parser.add_argument('--output-dir', help='Output directory (batch mode)')
parser.add_argument('--output-pattern', default='frame_{:04d}.png',
help='Output filename pattern for batch mode')
parser.add_argument('--timestep', type=float, default=0.5,
help='Interpolation timestep 0-1 (single frame mode)')
parser.add_argument('--frame-count', type=int,
help='Number of frames to generate (batch mode)')
parser.add_argument('--repo-dir', help='Unused (kept for backward compat)')
parser.add_argument('--model-dir', required=True, help='Model cache directory')
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
args = parser.parse_args()
# Validate arguments
batch_mode = args.output_dir is not None and args.frame_count is not None
single_mode = args.output is not None
if not batch_mode and not single_mode:
print("Error: Must specify either --output (single) or --output-dir + --frame-count (batch)",
file=sys.stderr)
return 1
try:
# Select device
if args.device == 'cuda' and torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load model
model_dir = Path(args.model_dir)
model = get_model(model_dir, device)
# Load images
img0 = load_image(Path(args.input0), device)
img1 = load_image(Path(args.input1), device)
if batch_mode:
# Batch mode - generate all frames at once
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Generating {args.frame_count} frames...", file=sys.stderr)
frames = interpolate_batch(model, img0, img1, args.frame_count)
for i, frame in enumerate(frames):
output_path = output_dir / args.output_pattern.format(i)
save_image(frame, output_path)
print(f"Saved {output_path.name}", file=sys.stderr)
print(f"Success: Generated {len(frames)} frames", file=sys.stderr)
else:
# Single frame mode
result = interpolate_single(model, img0, img1, args.timestep)
save_image(result, Path(args.output))
print("Success", file=sys.stderr)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -80,11 +80,12 @@ class SymlinkManager:
@staticmethod
def cleanup_old_links(directory: Path) -> int:
"""Remove existing seq* symlinks from a directory.
"""Remove existing seq* symlinks and temporary files from a directory.
Handles both old format (seq_0000) and new format (seq01_0000).
Also removes blended image files (not just symlinks) created by
cross-dissolve transitions.
Handles all naming formats:
- Old folder-indexed: seq01_0000.png
- Continuous: seq_00000.png
Also removes blended image files and film_temp_*.png temporaries.
Args:
directory: Directory to clean up.
@@ -96,31 +97,134 @@ class SymlinkManager:
CleanupError: If cleanup fails.
"""
removed = 0
seq_pattern = re.compile(r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE)
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
try:
for item in directory.iterdir():
# Match both old (seq_NNNN) and new (seqNN_NNNN) formats
should_remove = False
if item.name.startswith("seq"):
if item.is_symlink():
item.unlink()
removed += 1
should_remove = True
elif item.is_file() and seq_pattern.match(item.name):
# Also remove blended image files
item.unlink()
removed += 1
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to clean up old links: {e}") from e
return removed
@staticmethod
def remove_orphan_files(directory: Path, keep_names: set[str]) -> int:
"""Remove seq* files and film_temp_* not in the keep set.
Same pattern matching as cleanup_old_links but skips filenames
present in keep_names.
Args:
directory: Directory to clean orphans from.
keep_names: Set of filenames to keep.
Returns:
Number of files removed.
Raises:
CleanupError: If removal fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
try:
for item in directory.iterdir():
if item.name in keep_names:
continue
should_remove = False
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to remove orphan files: {e}") from e
return removed
@staticmethod
def symlink_matches(link_path: Path, expected_source: Path) -> bool:
"""Check if existing symlink resolves to expected source."""
if not link_path.is_symlink():
return False
try:
return link_path.resolve() == expected_source.resolve()
except OSError:
return False
@staticmethod
def copy_matches(dest_path: Path, source_path: Path) -> bool:
"""Check if existing copy matches source.
Fast path: size + mtime comparison. If sizes match but mtimes
differ, falls back to comparing file contents so that a
re-export after touching (but not changing) the source is still
skipped, while a genuine content change is caught.
"""
if not dest_path.is_file() or dest_path.is_symlink():
return False
try:
src_stat = source_path.stat()
dst_stat = dest_path.stat()
if src_stat.st_size != dst_stat.st_size:
return False
# Fast path: identical mtime means the copy2 wrote this file
if abs(src_stat.st_mtime - dst_stat.st_mtime) < 2.0:
return True
# Size matches but mtime differs — compare contents
return SymlinkManager._files_equal(source_path, dest_path)
except OSError:
return False
@staticmethod
def _files_equal(a: Path, b: Path, chunk_size: int = 65536) -> bool:
"""Compare two files by reading in chunks."""
try:
with open(a, 'rb') as fa, open(b, 'rb') as fb:
while True:
ca = fa.read(chunk_size)
cb = fb.read(chunk_size)
if ca != cb:
return False
if not ca:
return True
except OSError:
return False
def create_sequence_links(
self,
sources: list[Path],
dest: Path,
files: list[tuple],
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
copy_files: bool = False,
) -> tuple[list[LinkResult], Optional[int]]:
"""Create sequenced symlinks from source files to destination.
"""Create sequenced symlinks or copies from source files to destination.
Args:
sources: List of source directories (for validation).
@@ -129,12 +233,12 @@ class SymlinkManager:
- (source_dir, filename) for CLI mode (uses global sequence)
- (source_dir, filename, folder_idx, file_idx) for GUI mode
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
copy_files: If True, copy files instead of creating symlinks.
Returns:
Tuple of (list of LinkResult objects, session_id or None).
"""
self.validate_paths(sources, dest)
self.cleanup_old_links(dest)
session_id = None
if self.db:
@@ -165,6 +269,13 @@ class SymlinkManager:
expanded_files.append((source_dir, filename, folder_idx, file_idx))
files = expanded_files
# Build planned names for orphan removal
planned_names: set[str] = set()
for file_data in files:
_, fn, fi, fli = file_data
ext = Path(fn).suffix
planned_names.add(f"seq{fi + 1:02d}_{fli:04d}{ext}")
for i, file_data in enumerate(files):
source_dir, filename, folder_idx, file_idx = file_data
source_path = source_dir / filename
@@ -172,11 +283,25 @@ class SymlinkManager:
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
link_path = dest / link_name
# Calculate relative path from destination to source
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
try:
link_path.symlink_to(rel_source)
# Check if existing file already matches
already_correct = False
if link_path.exists() or link_path.is_symlink():
if copy_files:
already_correct = self.copy_matches(link_path, source_path)
else:
already_correct = self.symlink_matches(link_path, source_path)
if not already_correct:
if link_path.exists() or link_path.is_symlink():
link_path.unlink()
if copy_files:
import shutil
shutil.copy2(source_path, link_path)
else:
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
link_path.symlink_to(rel_source)
if self.db and session_id:
self.db.record_symlink(
@@ -202,4 +327,10 @@ class SymlinkManager:
error=str(e)
))
# Remove orphan seq*/film_temp_* files not in the planned set
try:
self.remove_orphan_files(dest, planned_names)
except CleanupError:
pass
return results, session_id

View File

@@ -32,6 +32,12 @@ class FolderType(Enum):
TRANSITION = 'transition'
class DirectInterpolationMethod(Enum):
"""Method for direct frame interpolation between sequences."""
RIFE = 'rife'
FILM = 'film'
# --- Data Classes ---
@dataclass
@@ -51,14 +57,54 @@ class TransitionSettings:
# Practical-RIFE settings
practical_rife_model: str = 'v4.25' # v4.25, v4.26, v4.22, etc.
practical_rife_ensemble: bool = False # Ensemble mode for better quality (slower)
# Optical flow settings
of_preset: str = 'balanced' # fast, balanced, quality, max
of_levels: int = 3 # pyramid levels (1-7)
of_winsize: int = 15 # window size (5-51, odd)
of_iterations: int = 3 # iterations (1-10)
of_poly_n: int = 5 # polynomial neighborhood (5 or 7)
of_poly_sigma: float = 1.2 # gaussian sigma (0.5-2.0)
@dataclass
class PerTransitionSettings:
"""Per-transition overlap settings for asymmetric cross-dissolves."""
"""Per-transition overlap settings for cross-dissolves."""
trans_folder: Path
left_overlap: int = 16 # frames from main folder end
right_overlap: int = 16 # frames from trans folder start
left_overlap: int = 16 # overlap count at left boundary (MAIN→TRANS)
right_overlap: int = 16 # overlap count at right boundary (TRANS→MAIN)
@dataclass
class DirectTransitionSettings:
"""Settings for direct AI interpolation between sequences (no transition folder)."""
after_folder: Path # The folder after which this transition occurs
frame_count: int = 16 # Number of interpolated frames to generate
method: DirectInterpolationMethod = DirectInterpolationMethod.FILM
enabled: bool = True
@dataclass
class VideoPreset:
"""Preset for video encoding via ffmpeg."""
label: str # Display name
container: str # 'mp4' or 'webm'
codec: str # ffmpeg codec: libx264, libx265, libvpx-vp9, libaom-av1
crf: int
pixel_format: str = 'yuv420p'
preset: str = 'medium' # x264/x265 speed preset
max_height: Optional[int] = None # Downscale filter
extra_args: list[str] = field(default_factory=list)
VIDEO_PRESETS: dict[str, VideoPreset] = {
'web_streaming': VideoPreset('Web Streaming', 'mp4', 'libx264', 23, preset='medium'),
'high_quality': VideoPreset('High Quality', 'mp4', 'libx264', 18, preset='slow'),
'archive': VideoPreset('Archive (H.265)', 'mp4', 'libx265', 18, preset='slow', extra_args=['-tag:v', 'hvc1']),
'social_media': VideoPreset('Social Media', 'mp4', 'libx264', 23, preset='fast', max_height=1080),
'fast_preview': VideoPreset('Fast Preview', 'mp4', 'libx264', 28, preset='ultrafast'),
'webm_vp9': VideoPreset('WebM VP9', 'webm', 'libvpx-vp9', 30, extra_args=['-b:v', '0']),
'webm_av1': VideoPreset('WebM AV1', 'webm', 'libaom-av1', 30, extra_args=['-b:v', '0', '-strict', 'experimental']),
'godot_theora': VideoPreset('Godot (Theora)', 'ogv', 'libtheora', 8, extra_args=['-g', '512']),
}
@dataclass
@@ -84,6 +130,9 @@ class TransitionSpec:
# Indices into the overall file list
main_start_idx: int
trans_start_idx: int
# Position indices in the folders list (for duplicate folder support)
main_folder_idx: int = 0
trans_folder_idx: int = 0
@dataclass
@@ -115,6 +164,8 @@ class SessionRecord:
created_at: datetime
destination: str
link_count: int = 0
name: Optional[str] = None
locked: bool = False
# --- Exceptions ---

259
core/video.py Normal file
View File

@@ -0,0 +1,259 @@
"""Video encoding utilities wrapping ffmpeg."""
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Callable, Optional
from .models import VideoPreset
def find_ffmpeg() -> Optional[Path]:
"""Find the ffmpeg binary on the system PATH."""
result = shutil.which('ffmpeg')
return Path(result) if result else None
def encode_image_sequence(
input_dir: Path,
output_path: Path,
fps: int,
preset: VideoPreset,
input_pattern: Optional[str] = None,
progress_callback: Optional[Callable[[int, int], bool]] = None,
total_frames: Optional[int] = None,
) -> tuple[bool, str]:
"""Encode an image sequence directory to a video file using ffmpeg.
Args:
input_dir: Directory containing sequentially named image files.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
input_pattern: ffmpeg input pattern (e.g. 'seq_%06d.png').
Auto-detected from first seq_* file if not provided.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
total_frames: Total number of frames for progress reporting.
Auto-counted from input_dir if not provided.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
# Auto-detect input pattern from first seq_* file
if input_pattern is None:
input_pattern = _detect_input_pattern(input_dir)
if input_pattern is None:
return False, f"No seq_* image files found in {input_dir}"
# Auto-count frames
if total_frames is None:
ext = Path(input_pattern).suffix
total_frames = len(list(input_dir.glob(f"seq_*{ext}")))
if total_frames == 0:
return False, f"No matching frames found in {input_dir}"
# Build ffmpeg command
cmd = [
str(ffmpeg), '-y',
'-framerate', str(fps),
'-i', str(input_dir / input_pattern),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
# Add speed preset for x264/x265
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
# Add downscale filter if max_height is set
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
# Add any extra codec-specific args
if preset.extra_args:
cmd += preset.extra_args
# Progress parsing via -progress pipe:1
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
# Clean up partial file
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
def _detect_input_pattern(input_dir: Path) -> Optional[str]:
"""Detect the ffmpeg input pattern from seq_* files in a directory.
Looks for files like seq_000000.png and returns a pattern like seq_%06d.png.
"""
for f in sorted(input_dir.iterdir()):
m = re.match(r'^(seq_)(\d+)(\.\w+)$', f.name)
if m:
prefix = m.group(1)
digits = m.group(2)
ext = m.group(3)
width = len(digits)
return f"{prefix}%0{width}d{ext}"
return None
def encode_from_file_list(
file_paths: list[Path],
output_path: Path,
fps: int,
preset: VideoPreset,
progress_callback: Optional[Callable[[int, int], bool]] = None,
) -> tuple[bool, str]:
"""Encode a video from an explicit list of image file paths.
Uses ffmpeg's concat demuxer so files can be scattered across directories.
Args:
file_paths: Ordered list of image file paths.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
if not file_paths:
return False, "No files provided."
total_frames = len(file_paths)
frame_duration = f"{1.0 / fps:.10f}"
# Write a concat-demuxer file listing each image with its duration
try:
concat_file = tempfile.NamedTemporaryFile(
mode='w', suffix='.txt', delete=False, prefix='vml_concat_'
)
concat_path = Path(concat_file.name)
for p in file_paths:
# Escape single quotes for ffmpeg concat format
escaped = str(p.resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.write(f"duration {frame_duration}\n")
# Repeat last file so the last frame displays for its full duration
escaped = str(file_paths[-1].resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.close()
except OSError as e:
return False, f"Failed to create concat file: {e}"
cmd = [
str(ffmpeg), '-y',
'-f', 'concat', '-safe', '0',
'-i', str(concat_path),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
if preset.extra_args:
cmd += preset.extra_args
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
finally:
try:
concat_path.unlink(missing_ok=True)
except OSError:
pass

File diff suppressed because it is too large Load Diff

View File

@@ -15,6 +15,7 @@ class TrimSlider(QWidget):
"""
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
trimDragFinished = pyqtSignal(int, int, str) # Emits final values on mouse release
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the trim slider.
@@ -287,5 +288,11 @@ class TrimSlider(QWidget):
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
"""Handle mouse release to stop dragging."""
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
if self._dragging:
handle = self._dragging
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
self.trimDragFinished.emit(self._trim_start, self._trim_end, handle)
else:
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)