Compare commits

..

8 Commits

Author SHA1 Message Date
cb9e981d4d Delete gitea-push-mirror-setup.md 2026-02-03 23:26:37 +01:00
a7444ef494 rife 2026-02-03 23:26:07 +01:00
1d9b5597f6 rife 2026-02-03 23:21:31 +01:00
e1fddb5e3c Add Practical-RIFE frame interpolation support
Implement standalone PyTorch-based RIFE interpolation that runs in a
dedicated virtual environment to avoid Qt/OpenCV conflicts:

- Add PracticalRifeEnv class for managing venv and subprocess execution
- Add rife_worker.py standalone interpolation script using Practical-RIFE
- Add RIFE_PRACTICAL blending model with ensemble/fast mode settings
- Add UI controls for Practical-RIFE configuration
- Update .gitignore to exclude venv-rife/ directory

The implementation downloads Practical-RIFE models on first use and runs
interpolation in a separate process with proper progress reporting.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 20:46:06 +01:00
3c4da8f23b Update README with comprehensive feature documentation
- Document cross-dissolve transitions and blend methods
- Add RIFE auto-download instructions
- Document per-folder trim and per-transition overlap
- Add file structure diagram
- Update installation requirements
- Expand supported formats list

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 18:51:50 +01:00
9c0d64f908 Restructure into multi-file architecture
Split monolithic symlink.py into modular components:
- config.py: Constants and configuration
- core/: Models, database, blender, manager
- ui/: Main window and widgets

New features included:
- Cross-dissolve transitions with multiple blend methods
- Alpha blend, Optical Flow, and RIFE (AI) interpolation
- Per-folder trim settings with start/end frame control
- Per-transition asymmetric overlap settings
- Folder type overrides (Main/Transition)
- Dual destination folders (sequence + transitions)
- WebP lossless output with compression method setting
- Video and image sequence preview with zoom/pan
- Session resume from destination folder

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 18:49:51 +01:00
41e3acb2cb Add alternating row colors to source folder list
Improves readability of the source folder panel.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 18:49:39 +01:00
3dd46a0aa1 cool 2026-02-03 16:21:56 +01:00
10 changed files with 456 additions and 5719 deletions

View File

@@ -13,20 +13,18 @@ A PyQt6 application for creating sequenced symlinks from image folders with adva
- Per-folder trim settings (exclude frames from start/end)
### Cross-Dissolve Transitions
Smooth blending between folder boundaries with four blend methods:
Smooth blending between folder boundaries with three blend methods:
| Method | Description | Quality | Speed |
|--------|-------------|---------|-------|
| **Cross-Dissolve** | Simple alpha blend | Good | Fastest |
| **Optical Flow** | Motion-compensated blend using OpenCV Farneback | Better | Medium |
| **RIFE (ncnn)** | Neural network interpolation via rife-ncnn-vulkan | Best | Fast (GPU) |
| **RIFE (Practical)** | PyTorch-based Practical-RIFE (v4.25/v4.26) | Best | Medium (GPU) |
| **RIFE (AI)** | Neural network frame interpolation | Best | Fast (GPU) |
- **Asymmetric overlap**: Set different frame counts for each side of a transition
- **Blend curves**: Linear, Ease In, Ease Out, Ease In/Out
- **Output formats**: PNG, JPEG (with quality), WebP (lossless with method setting)
- **RIFE auto-download**: Automatically downloads rife-ncnn-vulkan binary
- **Practical-RIFE models**: Auto-downloads from Google Drive on first use
### Preview
- **Video Preview**: Play video files from source folders
@@ -56,24 +54,11 @@ Smooth blending between folder boundaries with four blend methods:
pip install PyQt6 Pillow numpy opencv-python
```
**Note:** Practical-RIFE creates its own isolated venv with PyTorch. The `gdown` package is installed automatically for downloading models from Google Drive.
### RIFE ncnn (Optional)
For AI-powered frame interpolation using Vulkan GPU acceleration:
- Select **RIFE (ncnn)** as the blend method
- Click **Download** to auto-fetch [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan)
### RIFE (Optional)
For AI-powered frame interpolation, the app can auto-download [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan) or you can install it manually:
- Select **RIFE (AI)** as the blend method
- Click **Download** to fetch the latest release
- Or specify a custom binary path
- Models: rife-v4.6, rife-v4.15-lite, etc.
### Practical-RIFE (Optional)
For PyTorch-based frame interpolation with latest models:
- Select **RIFE (Practical)** as the blend method
- Click **Setup PyTorch** to create an isolated venv with PyTorch (~2GB)
- Models auto-download from Google Drive on first use
- Available models: v4.26, v4.25, v4.22, v4.20, v4.18, v4.15
- Optional ensemble mode for higher quality (slower)
The venv is stored at `~/.cache/video-montage-linker/venv-rife/`
## Usage
@@ -113,8 +98,7 @@ video-montage-linker/
├── core/
│ ├── models.py # Enums, dataclasses
│ ├── database.py # SQLite session management
│ ├── blender.py # Image blending, RIFE downloader, Practical-RIFE env
│ ├── rife_worker.py # Practical-RIFE inference (runs in isolated venv)
│ ├── blender.py # Image blending, RIFE downloader
│ └── manager.py # Symlink operations
└── ui/
├── widgets.py # TrimSlider, custom widgets

View File

@@ -4,12 +4,8 @@ from .models import (
BlendCurve,
BlendMethod,
FolderType,
DirectInterpolationMethod,
TransitionSettings,
PerTransitionSettings,
DirectTransitionSettings,
VideoPreset,
VIDEO_PRESETS,
BlendResult,
TransitionSpec,
LinkResult,
@@ -23,20 +19,15 @@ from .models import (
DatabaseError,
)
from .database import DatabaseManager
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv, FilmEnv, OPTICAL_FLOW_PRESETS
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv
from .manager import SymlinkManager
from .video import encode_image_sequence, encode_from_file_list, find_ffmpeg
__all__ = [
'BlendCurve',
'BlendMethod',
'FolderType',
'DirectInterpolationMethod',
'TransitionSettings',
'PerTransitionSettings',
'DirectTransitionSettings',
'VideoPreset',
'VIDEO_PRESETS',
'BlendResult',
'TransitionSpec',
'LinkResult',
@@ -53,10 +44,5 @@ __all__ = [
'TransitionGenerator',
'RifeDownloader',
'PracticalRifeEnv',
'FilmEnv',
'SymlinkManager',
'OPTICAL_FLOW_PRESETS',
'encode_image_sequence',
'encode_from_file_list',
'find_ffmpeg',
]

View File

@@ -23,8 +23,6 @@ from .models import (
PerTransitionSettings,
BlendResult,
TransitionSpec,
DirectInterpolationMethod,
DirectTransitionSettings,
)
@@ -33,14 +31,6 @@ CACHE_DIR = Path.home() / '.cache' / 'video-montage-linker'
RIFE_GITHUB_API = 'https://api.github.com/repos/nihui/rife-ncnn-vulkan/releases/latest'
PRACTICAL_RIFE_VENV_DIR = CACHE_DIR / 'venv-rife'
# Optical flow presets
OPTICAL_FLOW_PRESETS = {
'fast': {'levels': 2, 'winsize': 11, 'iterations': 2, 'poly_n': 5, 'poly_sigma': 1.1},
'balanced': {'levels': 3, 'winsize': 15, 'iterations': 3, 'poly_n': 5, 'poly_sigma': 1.2},
'quality': {'levels': 5, 'winsize': 21, 'iterations': 5, 'poly_n': 7, 'poly_sigma': 1.5},
'max': {'levels': 7, 'winsize': 31, 'iterations': 10, 'poly_n': 7, 'poly_sigma': 1.5},
}
class PracticalRifeEnv:
"""Manages isolated Python environment for Practical-RIFE."""
@@ -253,230 +243,6 @@ class PracticalRifeEnv:
return False, str(e)
class FilmEnv:
"""Manages FILM frame interpolation using shared venv with RIFE."""
VENV_DIR = PRACTICAL_RIFE_VENV_DIR # Share venv with RIFE
MODEL_CACHE_DIR = CACHE_DIR / 'film'
MODEL_FILENAME = 'film_net_fp32.pt'
MODEL_URL = 'https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt'
# Keep REPO_DIR for backward compat (but unused now - model is downloaded directly)
REPO_DIR = CACHE_DIR / 'frame-interpolation-pytorch'
@classmethod
def get_venv_python(cls) -> Optional[Path]:
"""Get path to venv Python executable."""
if cls.VENV_DIR.exists():
if sys.platform == 'win32':
return cls.VENV_DIR / 'Scripts' / 'python.exe'
return cls.VENV_DIR / 'bin' / 'python'
return None
@classmethod
def get_model_path(cls) -> Path:
"""Get path to the FILM TorchScript model."""
return cls.MODEL_CACHE_DIR / cls.MODEL_FILENAME
@classmethod
def is_setup(cls) -> bool:
"""Check if venv exists and FILM model is downloaded."""
python = cls.get_venv_python()
if not python or not python.exists():
return False
# Check if model is downloaded
return cls.get_model_path().exists()
@classmethod
def setup_film(cls, progress_callback=None, cancelled_check=None) -> bool:
"""Download FILM model and ensure venv is ready.
Args:
progress_callback: Optional callback(message, percent) for progress.
cancelled_check: Optional callable that returns True if cancelled.
Returns:
True if setup was successful.
"""
python = cls.get_venv_python()
if not python or not python.exists():
# Need to set up base venv first via PracticalRifeEnv
return False
try:
model_path = cls.get_model_path()
if not model_path.exists():
if progress_callback:
progress_callback("Downloading FILM model (~380MB)...", 30)
if cancelled_check and cancelled_check():
return False
# Download the pre-trained TorchScript model
cls.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(cls.MODEL_URL, model_path)
if progress_callback:
progress_callback("FILM setup complete!", 100)
return cls.is_setup()
except Exception as e:
print(f"[FILM] Setup error: {e}", file=sys.stderr)
return False
@classmethod
def get_worker_script(cls) -> Path:
"""Get path to the FILM worker script."""
return Path(__file__).parent / 'film_worker.py'
@classmethod
def run_interpolation(
cls,
img_a_path: Path,
img_b_path: Path,
output_path: Path,
t: float
) -> tuple[bool, str]:
"""Run FILM interpolation via subprocess in venv.
Args:
img_a_path: Path to first input image.
img_b_path: Path to second input image.
output_path: Path to output image.
t: Timestep for interpolation (0.0 to 1.0).
Returns:
Tuple of (success, error_message).
"""
python = cls.get_venv_python()
if not python or not python.exists():
return False, "venv python not found"
script = cls.get_worker_script()
if not script.exists():
return False, f"worker script not found: {script}"
cmd = [
str(python), str(script),
'--input0', str(img_a_path),
'--input1', str(img_b_path),
'--output', str(output_path),
'--timestep', str(t),
'--repo-dir', str(cls.REPO_DIR),
'--model-dir', str(cls.MODEL_CACHE_DIR)
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180 # 3 minute timeout per frame (FILM is slower)
)
if result.returncode == 0 and output_path.exists():
return True, ""
else:
error = result.stderr.strip() if result.stderr else f"returncode={result.returncode}"
return False, error
except subprocess.TimeoutExpired:
return False, "timeout (180s)"
except Exception as e:
return False, str(e)
@classmethod
def run_batch_interpolation(
cls,
img_a_path: Path,
img_b_path: Path,
output_dir: Path,
frame_count: int,
output_pattern: str = 'frame_{:04d}.png'
) -> tuple[bool, str, list[Path]]:
"""Run FILM batch interpolation via subprocess in venv.
Generates all frames at once using FILM's recursive approach,
which produces better results than generating frames independently.
Args:
img_a_path: Path to first input image.
img_b_path: Path to second input image.
output_dir: Directory to save output frames.
frame_count: Number of frames to generate.
output_pattern: Filename pattern for output frames.
Returns:
Tuple of (success, error_message, list_of_output_paths).
"""
python = cls.get_venv_python()
if not python or not python.exists():
return False, "venv python not found", []
script = cls.get_worker_script()
if not script.exists():
return False, f"worker script not found: {script}", []
output_dir.mkdir(parents=True, exist_ok=True)
cmd = [
str(python), str(script),
'--input0', str(img_a_path),
'--input1', str(img_b_path),
'--output-dir', str(output_dir),
'--frame-count', str(frame_count),
'--output-pattern', output_pattern,
'--repo-dir', str(cls.REPO_DIR),
'--model-dir', str(cls.MODEL_CACHE_DIR)
]
try:
# Longer timeout for batch - scale with frame count
timeout = max(300, frame_count * 30) # At least 5 min, +30s per frame
print(f"[FILM] Running batch interpolation: {frame_count} frames", file=sys.stderr)
print(f"[FILM] Command: {' '.join(cmd)}", file=sys.stderr)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
# Collect output paths
output_paths = [
output_dir / output_pattern.format(i)
for i in range(frame_count)
]
existing_paths = [p for p in output_paths if p.exists()]
if result.returncode == 0 and len(existing_paths) == frame_count:
print(f"[FILM] Success: generated {len(existing_paths)} frames", file=sys.stderr)
return True, "", output_paths
else:
# Combine stdout and stderr for better error reporting
error_parts = []
if result.returncode != 0:
error_parts.append(f"returncode={result.returncode}")
if result.stdout and result.stdout.strip():
error_parts.append(f"stdout: {result.stdout.strip()}")
if result.stderr and result.stderr.strip():
error_parts.append(f"stderr: {result.stderr.strip()}")
if len(existing_paths) != frame_count:
error_parts.append(f"expected {frame_count} frames, got {len(existing_paths)}")
error = "; ".join(error_parts) if error_parts else "unknown error"
print(f"[FILM] Failed: {error}", file=sys.stderr)
return False, error, existing_paths
except subprocess.TimeoutExpired:
print(f"[FILM] Timeout after {timeout}s", file=sys.stderr)
return False, f"timeout ({timeout}s)", []
except Exception as e:
print(f"[FILM] Exception: {e}", file=sys.stderr)
return False, str(e), []
class RifeDownloader:
"""Handles automatic download and caching of rife-ncnn-vulkan binary."""
@@ -775,16 +541,7 @@ class ImageBlender:
return Image.blend(frames[lower_idx], frames[upper_idx], frac)
@staticmethod
def optical_flow_blend(
img_a: Image.Image,
img_b: Image.Image,
t: float,
levels: int = 3,
winsize: int = 15,
iterations: int = 3,
poly_n: int = 5,
poly_sigma: float = 1.2
) -> Image.Image:
def optical_flow_blend(img_a: Image.Image, img_b: Image.Image, t: float) -> Image.Image:
"""Blend using OpenCV optical flow for motion compensation.
Uses Farneback dense optical flow to warp frames and reduce ghosting
@@ -794,11 +551,6 @@ class ImageBlender:
img_a: First PIL Image (source frame).
img_b: Second PIL Image (target frame).
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
levels: Pyramid levels for optical flow (1-7).
winsize: Window size for optical flow (5-51, odd).
iterations: Number of iterations (1-10).
poly_n: Polynomial neighborhood size (5 or 7).
poly_sigma: Gaussian sigma for polynomial expansion (0.5-2.0).
Returns:
Motion-compensated blended PIL Image.
@@ -819,11 +571,11 @@ class ImageBlender:
flow = cv2.calcOpticalFlowFarneback(
gray_a, gray_b, None,
pyr_scale=0.5,
levels=levels,
winsize=winsize,
iterations=iterations,
poly_n=poly_n,
poly_sigma=poly_sigma,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
flags=0
)
@@ -1050,56 +802,6 @@ class ImageBlender:
# Fall back to ncnn RIFE or optical flow
return ImageBlender.rife_blend(img_a, img_b, t)
@staticmethod
def film_blend(
img_a: Image.Image,
img_b: Image.Image,
t: float
) -> Image.Image:
"""Blend using FILM for large motion interpolation.
FILM (Frame Interpolation for Large Motion) is Google Research's
high-quality frame interpolation model, better for large motion.
Args:
img_a: First PIL Image (source frame).
img_b: Second PIL Image (target frame).
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
Returns:
AI-interpolated blended PIL Image.
"""
if not FilmEnv.is_setup():
print("[FILM] Not set up, falling back to Practical-RIFE", file=sys.stderr)
return ImageBlender.practical_rife_blend(img_a, img_b, t)
try:
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir)
input_a = tmp / 'a.png'
input_b = tmp / 'b.png'
output_file = tmp / 'out.png'
# Save input images
img_a.convert('RGB').save(input_a)
img_b.convert('RGB').save(input_b)
# Run FILM via subprocess
success, error_msg = FilmEnv.run_interpolation(
input_a, input_b, output_file, t
)
if success and output_file.exists():
return Image.open(output_file).copy()
else:
print(f"[FILM] Interpolation failed: {error_msg}, falling back to Practical-RIFE", file=sys.stderr)
except Exception as e:
print(f"[FILM] Exception: {e}, falling back to Practical-RIFE", file=sys.stderr)
# Fall back to Practical-RIFE
return ImageBlender.practical_rife_blend(img_a, img_b, t)
@staticmethod
def blend_images(
img_a_path: Path,
@@ -1115,12 +817,7 @@ class ImageBlender:
rife_uhd: bool = False,
rife_tta: bool = False,
practical_rife_model: str = 'v4.25',
practical_rife_ensemble: bool = False,
of_levels: int = 3,
of_winsize: int = 15,
of_iterations: int = 3,
of_poly_n: int = 5,
of_poly_sigma: float = 1.2
practical_rife_ensemble: bool = False
) -> BlendResult:
"""Blend two images together.
@@ -1139,11 +836,6 @@ class ImageBlender:
rife_tta: Enable RIFE ncnn TTA mode.
practical_rife_model: Practical-RIFE model version (e.g., 'v4.25').
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
of_levels: Optical flow pyramid levels (1-7).
of_winsize: Optical flow window size (5-51, odd).
of_iterations: Optical flow iterations (1-10).
of_poly_n: Optical flow polynomial neighborhood (5 or 7).
of_poly_sigma: Optical flow gaussian sigma (0.5-2.0).
Returns:
BlendResult with operation status.
@@ -1164,14 +856,7 @@ class ImageBlender:
# Blend images using selected method
if blend_method == BlendMethod.OPTICAL_FLOW:
blended = ImageBlender.optical_flow_blend(
img_a, img_b, factor,
levels=of_levels,
winsize=of_winsize,
iterations=of_iterations,
poly_n=of_poly_n,
poly_sigma=of_poly_sigma
)
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
elif blend_method == BlendMethod.RIFE:
blended = ImageBlender.rife_blend(
img_a, img_b, factor, rife_binary_path, True, rife_model, rife_uhd, rife_tta
@@ -1237,12 +922,7 @@ class ImageBlender:
rife_uhd: bool = False,
rife_tta: bool = False,
practical_rife_model: str = 'v4.25',
practical_rife_ensemble: bool = False,
of_levels: int = 3,
of_winsize: int = 15,
of_iterations: int = 3,
of_poly_n: int = 5,
of_poly_sigma: float = 1.2
practical_rife_ensemble: bool = False
) -> BlendResult:
"""Blend two PIL Image objects together.
@@ -1261,11 +941,6 @@ class ImageBlender:
rife_tta: Enable RIFE ncnn TTA mode.
practical_rife_model: Practical-RIFE model version (e.g., 'v4.25').
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
of_levels: Optical flow pyramid levels (1-7).
of_winsize: Optical flow window size (5-51, odd).
of_iterations: Optical flow iterations (1-10).
of_poly_n: Optical flow polynomial neighborhood (5 or 7).
of_poly_sigma: Optical flow gaussian sigma (0.5-2.0).
Returns:
BlendResult with operation status.
@@ -1283,14 +958,7 @@ class ImageBlender:
# Blend images using selected method
if blend_method == BlendMethod.OPTICAL_FLOW:
blended = ImageBlender.optical_flow_blend(
img_a, img_b, factor,
levels=of_levels,
winsize=of_winsize,
iterations=of_iterations,
poly_n=of_poly_n,
poly_sigma=of_poly_sigma
)
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
elif blend_method == BlendMethod.RIFE:
blended = ImageBlender.rife_blend(
img_a, img_b, factor, rife_binary_path, True, rife_model, rife_uhd, rife_tta
@@ -1356,19 +1024,21 @@ class TransitionGenerator:
def get_folder_type(
self,
index: int,
overrides: Optional[dict[int, FolderType]] = None,
overrides: Optional[dict[Path, FolderType]] = None,
folder: Optional[Path] = None
) -> FolderType:
"""Determine folder type based on position or override.
Args:
index: 0-based position of folder in list.
overrides: Optional dict of position index to FolderType overrides.
overrides: Optional dict of folder path to FolderType overrides.
folder: The folder path for checking overrides.
Returns:
FolderType.MAIN for even positions (0, 2, 4...), TRANSITION for odd.
FolderType.MAIN for odd positions (1, 3, 5...), TRANSITION for even.
"""
if overrides and index in overrides:
override = overrides[index]
if overrides and folder and folder in overrides:
override = overrides[folder]
if override != FolderType.AUTO:
return override
@@ -1378,9 +1048,9 @@ class TransitionGenerator:
def identify_transition_boundaries(
self,
folders: list[Path],
files_by_idx: dict[int, list[str]],
folder_overrides: Optional[dict[int, FolderType]] = None,
per_transition_settings: Optional[dict[int, PerTransitionSettings]] = None
files_by_folder: dict[Path, list[str]],
folder_overrides: Optional[dict[Path, FolderType]] = None,
per_transition_settings: Optional[dict[Path, PerTransitionSettings]] = None
) -> list[TransitionSpec]:
"""Identify boundaries where transitions should occur.
@@ -1389,9 +1059,9 @@ class TransitionGenerator:
Args:
folders: List of folders in order.
files_by_idx: Dict mapping position index to file lists.
folder_overrides: Optional position-index-keyed folder type overrides.
per_transition_settings: Optional position-index-keyed per-transition overlap settings.
files_by_folder: Dict mapping folders to their file lists.
folder_overrides: Optional folder type overrides.
per_transition_settings: Optional per-transition overlap settings.
Returns:
List of TransitionSpec objects describing each transition.
@@ -1401,67 +1071,47 @@ class TransitionGenerator:
transitions = []
cumulative_idx = 0
folder_start_indices: dict[int, int] = {}
folder_start_indices = {}
# Calculate start indices for each folder position
for i in range(len(folders)):
folder_start_indices[i] = cumulative_idx
cumulative_idx += len(files_by_idx.get(i, []))
# Track how many files are committed from each folder's start and end
# so overlaps never exceed available frames.
committed_from_start: dict[int, int] = {} # folder idx → frames used from start
committed_from_end: dict[int, int] = {} # folder idx → frames used from end
# Calculate start indices for each folder
for folder in folders:
folder_start_indices[folder] = cumulative_idx
cumulative_idx += len(files_by_folder.get(folder, []))
# Look for transition boundaries (MAIN->TRANSITION and TRANSITION->MAIN)
for i in range(len(folders) - 1):
folder_a = folders[i]
folder_b = folders[i + 1]
type_a = self.get_folder_type(i, folder_overrides)
type_b = self.get_folder_type(i + 1, folder_overrides)
type_a = self.get_folder_type(i, folder_overrides, folder_a)
type_b = self.get_folder_type(i + 1, folder_overrides, folder_b)
# Create transition when types differ (MAIN->TRANS or TRANS->MAIN)
if type_a != type_b:
files_a = files_by_idx.get(i, [])
files_b = files_by_idx.get(i + 1, [])
files_a = files_by_folder.get(folder_a, [])
files_b = files_by_folder.get(folder_b, [])
if not files_a or not files_b:
continue
# Get per-transition overlap settings from the TRANSITION folder
# (could be at position i or i+1 depending on boundary direction)
pts_key = i if type_a == FolderType.TRANSITION else i + 1
if per_transition_settings and pts_key in per_transition_settings:
pts = per_transition_settings[pts_key]
if type_a == FolderType.TRANSITION:
# TRANS→MAIN boundary: use right_overlap (right boundary count)
left_overlap = pts.right_overlap
right_overlap = pts.right_overlap
else:
# MAIN→TRANS boundary: use left_overlap (left boundary count)
left_overlap = pts.left_overlap
right_overlap = pts.left_overlap
# Get per-transition overlap settings if available
# Use folder_b as the key (the "incoming" folder)
if per_transition_settings and folder_b in per_transition_settings:
pts = per_transition_settings[folder_b]
left_overlap = pts.left_overlap
right_overlap = pts.right_overlap
else:
# Use default of 16 for both
left_overlap = 16
right_overlap = 16
# Cap overlaps by available files, accounting for frames
# already committed to a prior boundary on the same folder.
# Keep both sides equal (symmetric) after capping.
avail_a = len(files_a) - committed_from_start.get(i, 0)
avail_b = len(files_b) - committed_from_end.get(i + 1, 0)
capped = min(left_overlap, right_overlap, avail_a, avail_b)
left_overlap = capped
right_overlap = capped
# Cap overlaps by available files
left_overlap = min(left_overlap, len(files_a))
right_overlap = min(right_overlap, len(files_b))
if left_overlap < 1 or right_overlap < 1:
continue
committed_from_end[i] = committed_from_end.get(i, 0) + left_overlap
committed_from_start[i + 1] = committed_from_start.get(i + 1, 0) + right_overlap
transitions.append(TransitionSpec(
main_folder=folder_a,
trans_folder=folder_b,
@@ -1469,10 +1119,8 @@ class TransitionGenerator:
trans_files=files_b,
left_overlap=left_overlap,
right_overlap=right_overlap,
main_start_idx=folder_start_indices[i],
trans_start_idx=folder_start_indices[i + 1],
main_folder_idx=i,
trans_folder_idx=i + 1,
main_start_idx=folder_start_indices[folder_a],
trans_start_idx=folder_start_indices[folder_b]
))
return transitions
@@ -1482,7 +1130,7 @@ class TransitionGenerator:
spec: TransitionSpec,
dest: Path,
folder_idx_main: int,
base_seq_num: int
base_file_idx: int
) -> list[BlendResult]:
"""Generate blended frames for an asymmetric transition.
@@ -1493,8 +1141,8 @@ class TransitionGenerator:
Args:
spec: TransitionSpec describing the transition.
dest: Destination directory for blended frames.
folder_idx_main: Folder index (unused, kept for compatibility).
base_seq_num: Starting sequence number for continuous naming.
folder_idx_main: Folder index for sequence naming.
base_file_idx: Starting file index for sequence naming.
Returns:
List of BlendResult objects.
@@ -1549,8 +1197,8 @@ class TransitionGenerator:
# Generate output filename
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
file_idx = base_file_idx + i
output_name = f"seq{folder_idx_main + 1:02d}_{file_idx:04d}{ext}"
output_path = dest / output_name
result = self.blender.blend_images_pil(
@@ -1567,12 +1215,7 @@ class TransitionGenerator:
self.settings.rife_uhd,
self.settings.rife_tta,
self.settings.practical_rife_model,
self.settings.practical_rife_ensemble,
self.settings.of_levels,
self.settings.of_winsize,
self.settings.of_iterations,
self.settings.of_poly_n,
self.settings.of_poly_sigma
self.settings.practical_rife_ensemble
)
results.append(result)
@@ -1589,7 +1232,7 @@ class TransitionGenerator:
spec: TransitionSpec,
dest: Path,
folder_idx_main: int,
base_seq_num: int
base_file_idx: int
) -> list[BlendResult]:
"""Generate blended frames for a transition.
@@ -1598,249 +1241,13 @@ class TransitionGenerator:
Args:
spec: TransitionSpec describing the transition.
dest: Destination directory for blended frames.
folder_idx_main: Folder index (unused, kept for compatibility).
base_seq_num: Starting sequence number for continuous naming.
folder_idx_main: Folder index for sequence naming.
base_file_idx: Starting file index for sequence naming.
Returns:
List of BlendResult objects.
"""
# Use asymmetric blend for all cases (handles symmetric too)
return self.generate_asymmetric_blend_frames(
spec, dest, folder_idx_main, base_seq_num
spec, dest, folder_idx_main, base_file_idx
)
def generate_direct_interpolation_frames(
self,
img_a_path: Path,
img_b_path: Path,
frame_count: int,
method: DirectInterpolationMethod,
dest: Path,
folder_idx: int,
base_seq_num: int,
practical_rife_model: str = 'v4.25',
practical_rife_ensemble: bool = False
) -> list[BlendResult]:
"""Generate AI-interpolated frames between two images.
Used for direct transitions between MAIN sequences without
a transition folder.
For FILM: Uses batch mode to generate all frames at once (better quality).
For RIFE: Generates frames one at a time (RIFE handles arbitrary timesteps well).
Args:
img_a_path: Path to last frame of first sequence.
img_b_path: Path to first frame of second sequence.
frame_count: Number of interpolated frames to generate.
method: Interpolation method (RIFE or FILM).
dest: Destination directory for generated frames.
folder_idx: Folder index (unused, kept for compatibility).
base_seq_num: Starting sequence number for continuous naming.
practical_rife_model: Practical-RIFE model version.
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
Returns:
List of BlendResult objects.
"""
results = []
dest.mkdir(parents=True, exist_ok=True)
# For FILM, use batch mode to generate all frames at once
if method == DirectInterpolationMethod.FILM and FilmEnv.is_setup():
return self._generate_film_frames_batch(
img_a_path, img_b_path, frame_count, dest, base_seq_num
)
# For RIFE (or FILM fallback), generate frames one at a time
# Load source images
img_a = Image.open(img_a_path)
img_b = Image.open(img_b_path)
# Handle different sizes - resize B to match A
if img_a.size != img_b.size:
img_b = img_b.resize(img_a.size, Image.Resampling.LANCZOS)
# Normalize to RGBA
if img_a.mode != 'RGBA':
img_a = img_a.convert('RGBA')
if img_b.mode != 'RGBA':
img_b = img_b.convert('RGBA')
for i in range(frame_count):
# Evenly space t values between 0 and 1 (exclusive)
t = (i + 1) / (frame_count + 1)
# Generate interpolated frame
if method == DirectInterpolationMethod.FILM:
blended = ImageBlender.film_blend(img_a, img_b, t)
else: # RIFE
blended = ImageBlender.practical_rife_blend(
img_a, img_b, t,
practical_rife_model, practical_rife_ensemble
)
# Generate output filename
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
# Save the blended frame
try:
# Convert back to RGB if saving to JPEG
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
blended = blended.convert('RGB')
# Save with appropriate options
save_kwargs = {}
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
save_kwargs['quality'] = self.settings.output_quality
elif self.settings.output_format.lower() == 'webp':
save_kwargs['lossless'] = True
save_kwargs['method'] = self.settings.webp_method
elif self.settings.output_format.lower() == 'png':
save_kwargs['compress_level'] = 6
blended.save(output_path, **save_kwargs)
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=True
))
except Exception as e:
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=str(e)
))
# Close loaded images
img_a.close()
img_b.close()
return results
def _generate_film_frames_batch(
self,
img_a_path: Path,
img_b_path: Path,
frame_count: int,
dest: Path,
base_seq_num: int
) -> list[BlendResult]:
"""Generate FILM frames using batch mode for better quality.
FILM works best when generating all frames at once using its
recursive approach, rather than generating arbitrary timesteps.
Args:
img_a_path: Path to last frame of first sequence.
img_b_path: Path to first frame of second sequence.
frame_count: Number of interpolated frames to generate.
dest: Destination directory for generated frames.
base_seq_num: Starting sequence number for continuous naming.
Returns:
List of BlendResult objects.
"""
results = []
# Generate frames using FILM batch mode
# Use a temp pattern, then rename to final names
temp_pattern = 'film_temp_{:04d}.png'
success, error, temp_paths = FilmEnv.run_batch_interpolation(
img_a_path,
img_b_path,
dest,
frame_count,
temp_pattern
)
if not success:
# Return error results for all frames
for i in range(frame_count):
t = (i + 1) / (frame_count + 1)
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=error
))
return results
# Rename temp files to final names and convert format if needed
for i, temp_path in enumerate(temp_paths):
t = (i + 1) / (frame_count + 1)
ext = f".{self.settings.output_format.lower()}"
seq_num = base_seq_num + i
output_name = f"seq_{seq_num:05d}{ext}"
output_path = dest / output_name
try:
if temp_path.exists():
# Load the temp frame
frame = Image.open(temp_path)
# Convert format if needed
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
frame = frame.convert('RGB')
# Save with appropriate options
save_kwargs = {}
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
save_kwargs['quality'] = self.settings.output_quality
elif self.settings.output_format.lower() == 'webp':
save_kwargs['lossless'] = True
save_kwargs['method'] = self.settings.webp_method
elif self.settings.output_format.lower() == 'png':
save_kwargs['compress_level'] = 6
frame.save(output_path, **save_kwargs)
frame.close()
# Remove temp file if different from output
if temp_path != output_path:
temp_path.unlink(missing_ok=True)
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=True
))
else:
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=f"Temp file not found: {temp_path}"
))
except Exception as e:
results.append(BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=t,
success=False,
error=str(e)
))
return results

View File

@@ -39,8 +39,7 @@ class DatabaseManager:
CREATE TABLE IF NOT EXISTS symlink_sessions (
id INTEGER PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
destination TEXT NOT NULL,
name TEXT DEFAULT NULL
destination TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS symlinks (
@@ -83,24 +82,6 @@ class DatabaseManager:
right_overlap INTEGER DEFAULT 16,
UNIQUE(session_id, trans_folder)
);
CREATE TABLE IF NOT EXISTS removed_files (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
filename TEXT NOT NULL,
UNIQUE(session_id, source_folder, filename)
);
CREATE TABLE IF NOT EXISTS direct_transition_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
after_folder TEXT NOT NULL,
frame_count INTEGER DEFAULT 16,
method TEXT DEFAULT 'film',
enabled INTEGER DEFAULT 1,
UNIQUE(session_id, after_folder)
);
""")
# Migration: add folder_type column if it doesn't exist
@@ -133,168 +114,20 @@ class DatabaseManager:
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN rife_binary_path TEXT")
# Migration: add folder_order column if it doesn't exist
try:
conn.execute("SELECT folder_order FROM sequence_trim_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE sequence_trim_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
# Migration: add name column to symlink_sessions if it doesn't exist
try:
conn.execute("SELECT name FROM symlink_sessions LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN name TEXT DEFAULT NULL")
# Migration: widen UNIQUE constraints to allow duplicate folder paths per session.
# sequence_trim_settings: UNIQUE(session_id, source_folder) → UNIQUE(session_id, folder_order)
self._migrate_unique_constraint(
conn, 'sequence_trim_settings',
"""CREATE TABLE sequence_trim_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
trim_start INTEGER DEFAULT 0,
trim_end INTEGER DEFAULT 0,
folder_type TEXT DEFAULT 'auto',
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, folder_order)
)""",
'session_id, source_folder, trim_start, trim_end, folder_type, folder_order',
)
# per_transition_settings: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM per_transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE per_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'per_transition_settings',
"""CREATE TABLE per_transition_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
trans_folder TEXT NOT NULL,
left_overlap INTEGER DEFAULT 16,
right_overlap INTEGER DEFAULT 16,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, trans_folder, folder_order)
)""",
'session_id, trans_folder, left_overlap, right_overlap, folder_order',
)
# removed_files: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM removed_files LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE removed_files ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'removed_files',
"""CREATE TABLE removed_files_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
filename TEXT NOT NULL,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, source_folder, filename, folder_order)
)""",
'session_id, source_folder, filename, folder_order',
)
# direct_transition_settings: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM direct_transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE direct_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'direct_transition_settings',
"""CREATE TABLE direct_transition_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
after_folder TEXT NOT NULL,
frame_count INTEGER DEFAULT 16,
method TEXT DEFAULT 'film',
enabled INTEGER DEFAULT 1,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, after_folder, folder_order)
)""",
'session_id, after_folder, frame_count, method, enabled, folder_order',
)
# Migration: add locked column to symlink_sessions
try:
conn.execute("SELECT locked FROM symlink_sessions LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN locked INTEGER DEFAULT 0")
# Migration: remove overlap_frames from transition_settings (now per-transition)
# We'll keep it for backward compatibility but won't use it
@staticmethod
def _migrate_unique_constraint(
conn: sqlite3.Connection,
table: str,
create_new_sql: str,
columns: str,
) -> None:
"""Recreate a table with a new UNIQUE constraint if needed.
Tests whether duplicate folder_order=0 entries can be inserted.
If an IntegrityError fires, the old constraint is too narrow and
the table must be recreated.
"""
new_table = f"{table}_new"
try:
# Test: can we insert two rows with same session+folder but different folder_order?
# If the old UNIQUE is still (session_id, source_folder) this will fail.
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 1)")
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 2)")
# Clean up test rows
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
# If we got here, the constraint already allows duplicates — no migration needed
return
except sqlite3.IntegrityError:
# Old constraint is too narrow — need to recreate
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
except sqlite3.OperationalError:
# Column might not exist yet or other issue — try migration anyway
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
try:
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
conn.execute(create_new_sql)
conn.execute(f"INSERT INTO {new_table} ({columns}) SELECT {columns} FROM {table}")
conn.execute(f"DROP TABLE {table}")
conn.execute(f"ALTER TABLE {new_table} RENAME TO {table}")
except (sqlite3.OperationalError, sqlite3.IntegrityError):
# Clean up failed migration attempt
try:
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
except sqlite3.OperationalError:
pass
def clear_session_data(self, session_id: int) -> None:
"""Delete all data for a session (symlinks, settings, etc.) but keep the session row."""
try:
with self._connect() as conn:
for table in (
'symlinks', 'sequence_trim_settings', 'transition_settings',
'per_transition_settings', 'removed_files', 'direct_transition_settings',
):
conn.execute(f"DELETE FROM {table} WHERE session_id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to clear session data: {e}") from e
def _connect(self) -> sqlite3.Connection:
"""Create a database connection with foreign keys enabled."""
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def create_session(self, destination: str, name: Optional[str] = None) -> int:
def create_session(self, destination: str) -> int:
"""Create a new linking session.
Args:
destination: The destination directory path.
name: Optional display name (e.g. "autosave").
Returns:
The ID of the created session.
@@ -305,8 +138,8 @@ class DatabaseManager:
try:
with self._connect() as conn:
cursor = conn.execute(
"INSERT INTO symlink_sessions (destination, name) VALUES (?, ?)",
(destination, name)
"INSERT INTO symlink_sessions (destination) VALUES (?)",
(destination,)
)
return cursor.lastrowid
except sqlite3.Error as e:
@@ -347,31 +180,6 @@ class DatabaseManager:
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlink: {e}") from e
def record_symlinks_batch(
self,
session_id: int,
records: list[tuple[str, str, str, int]],
) -> None:
"""Record multiple symlinks in a single transaction.
Args:
session_id: The session these symlinks belong to.
records: List of (source, link, filename, seq) tuples.
Raises:
DatabaseError: If recording fails.
"""
try:
with self._connect() as conn:
conn.executemany(
"""INSERT INTO symlinks
(session_id, source_path, link_path, original_filename, sequence_number)
VALUES (?, ?, ?, ?, ?)""",
[(session_id, src, lnk, fname, seq) for src, lnk, fname, seq in records]
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlinks: {e}") from e
def get_sessions(self) -> list[SessionRecord]:
"""List all sessions with link counts.
@@ -380,8 +188,7 @@ class DatabaseManager:
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
s.name, COALESCE(s.locked, 0)
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
GROUP BY s.id
@@ -393,9 +200,7 @@ class DatabaseManager:
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3],
name=row[4],
locked=bool(row[5])
link_count=row[3]
)
for row in rows
]
@@ -465,7 +270,7 @@ class DatabaseManager:
]
def delete_session(self, session_id: int) -> None:
"""Delete a session and all its related data (CASCADE handles child tables).
"""Delete a session and all its symlink records.
Args:
session_id: The session ID to delete.
@@ -475,56 +280,11 @@ class DatabaseManager:
"""
try:
with self._connect() as conn:
conn.execute("DELETE FROM symlinks WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM symlink_sessions WHERE id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete session: {e}") from e
def delete_sessions(self, session_ids: list[int]) -> None:
"""Delete multiple sessions in a single transaction.
Locked sessions are silently skipped.
Args:
session_ids: List of session IDs to delete.
Raises:
DatabaseError: If deletion fails.
"""
if not session_ids:
return
try:
with self._connect() as conn:
placeholders = ','.join('?' for _ in session_ids)
conn.execute(
f"DELETE FROM symlink_sessions WHERE id IN ({placeholders}) AND COALESCE(locked, 0) = 0",
session_ids
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete sessions: {e}") from e
def toggle_session_locked(self, session_id: int) -> bool:
"""Toggle the locked state of a session.
Returns:
The new locked state.
"""
try:
with self._connect() as conn:
row = conn.execute(
"SELECT COALESCE(locked, 0) FROM symlink_sessions WHERE id = ?",
(session_id,)
).fetchone()
if row is None:
raise DatabaseError(f"Session {session_id} not found")
new_val = 0 if row[0] else 1
conn.execute(
"UPDATE symlink_sessions SET locked = ? WHERE id = ?",
(new_val, session_id)
)
return bool(new_val)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to toggle session lock: {e}") from e
def get_sessions_by_destination(self, dest: str) -> list[SessionRecord]:
"""Get all sessions for a destination directory.
@@ -536,8 +296,7 @@ class DatabaseManager:
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
s.name, COALESCE(s.locked, 0)
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
WHERE s.destination = ?
@@ -550,9 +309,7 @@ class DatabaseManager:
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3],
name=row[4],
locked=bool(row[5])
link_count=row[3]
)
for row in rows
]
@@ -563,8 +320,7 @@ class DatabaseManager:
source_folder: str,
trim_start: int,
trim_end: int,
folder_type: FolderType = FolderType.AUTO,
folder_order: int = 0,
folder_type: FolderType = FolderType.AUTO
) -> None:
"""Save trim settings for a folder in a session.
@@ -574,7 +330,6 @@ class DatabaseManager:
trim_start: Number of images to trim from start.
trim_end: Number of images to trim from end.
folder_type: The folder type (auto, main, or transition).
folder_order: Position of this folder in source_folders list.
Raises:
DatabaseError: If saving fails.
@@ -583,14 +338,13 @@ class DatabaseManager:
with self._connect() as conn:
conn.execute(
"""INSERT INTO sequence_trim_settings
(session_id, source_folder, trim_start, trim_end, folder_type, folder_order)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, folder_order)
DO UPDATE SET source_folder=excluded.source_folder,
trim_start=excluded.trim_start,
(session_id, source_folder, trim_start, trim_end, folder_type)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, source_folder)
DO UPDATE SET trim_start=excluded.trim_start,
trim_end=excluded.trim_end,
folder_type=excluded.folder_type""",
(session_id, source_folder, trim_start, trim_end, folder_type.value, folder_order)
(session_id, source_folder, trim_start, trim_end, folder_type.value)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save trim settings: {e}") from e
@@ -642,62 +396,6 @@ class DatabaseManager:
return {row[0]: (row[1], row[2]) for row in rows}
def get_all_folder_settings(self, session_id: int) -> dict[str, tuple[int, int, FolderType]]:
"""Get all folder settings (trim + type) for a session, unordered.
Returns:
Dict mapping source_folder to (trim_start, trim_end, folder_type).
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, trim_start, trim_end, folder_type
FROM sequence_trim_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
result = {}
for row in rows:
try:
ft = FolderType(row[3]) if row[3] else FolderType.AUTO
except ValueError:
ft = FolderType.AUTO
result[row[0]] = (row[1], row[2], ft)
return result
def get_ordered_folders(self, session_id: int) -> list[tuple[str, FolderType, int, int]]:
"""Get all folders for a session in saved order.
Returns:
List of (source_folder, folder_type, trim_start, trim_end) sorted by folder_order.
Returns empty list if folder_order is not meaningful (all zeros from
pre-migration sessions), so the caller falls back to symlink-derived order.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, folder_type, trim_start, trim_end, folder_order
FROM sequence_trim_settings WHERE session_id = ?
ORDER BY folder_order""",
(session_id,)
).fetchall()
if not rows:
return []
# If all folder_order values are 0, this is a pre-migration session
# where the ordering is not meaningful — return empty to trigger
# the legacy symlink-derived ordering path.
if len(rows) > 1 and all(row[4] == 0 for row in rows):
return []
result = []
for row in rows:
try:
ft = FolderType(row[1]) if row[1] else FolderType.AUTO
except ValueError:
ft = FolderType.AUTO
result.append((row[0], ft, row[2], row[3]))
return result
def save_transition_settings(
self,
session_id: int,
@@ -834,15 +532,13 @@ class DatabaseManager:
def save_per_transition_settings(
self,
session_id: int,
settings: PerTransitionSettings,
folder_order: int = 0,
settings: PerTransitionSettings
) -> None:
"""Save per-transition overlap settings.
Args:
session_id: The session ID.
settings: PerTransitionSettings to save.
folder_order: Position of this folder in the source list.
Raises:
DatabaseError: If saving fails.
@@ -851,13 +547,13 @@ class DatabaseManager:
with self._connect() as conn:
conn.execute(
"""INSERT INTO per_transition_settings
(session_id, trans_folder, left_overlap, right_overlap, folder_order)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder, folder_order)
(session_id, trans_folder, left_overlap, right_overlap)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder)
DO UPDATE SET left_overlap=excluded.left_overlap,
right_overlap=excluded.right_overlap""",
(session_id, str(settings.trans_folder),
settings.left_overlap, settings.right_overlap, folder_order)
settings.left_overlap, settings.right_overlap)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save per-transition settings: {e}") from e
@@ -894,110 +590,27 @@ class DatabaseManager:
def get_all_per_transition_settings(
self,
session_id: int
) -> list[tuple[str, int, int, int]]:
) -> dict[str, PerTransitionSettings]:
"""Get all per-transition settings for a session.
Args:
session_id: The session ID.
Returns:
List of (trans_folder, left_overlap, right_overlap, folder_order) tuples.
Dict mapping transition folder paths to PerTransitionSettings.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT trans_folder, left_overlap, right_overlap, folder_order
FROM per_transition_settings WHERE session_id = ?
ORDER BY folder_order""",
"""SELECT trans_folder, left_overlap, right_overlap
FROM per_transition_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
return [(row[0], row[1], row[2], row[3]) for row in rows]
def save_removed_files(
self,
session_id: int,
source_folder: str,
filenames: list[str],
folder_order: int = 0,
) -> None:
"""Save removed files for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
filenames: List of removed filenames.
folder_order: Position of this folder in the source list.
"""
try:
with self._connect() as conn:
for filename in filenames:
conn.execute(
"""INSERT OR IGNORE INTO removed_files
(session_id, source_folder, filename, folder_order)
VALUES (?, ?, ?, ?)""",
(session_id, source_folder, filename, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save removed files: {e}") from e
def get_removed_files(self, session_id: int) -> dict[int, set[str]]:
"""Get all removed files for a session, keyed by folder_order.
Args:
session_id: The session ID.
Returns:
Dict mapping folder_order to sets of removed filenames.
"""
with self._connect() as conn:
rows = conn.execute(
"SELECT source_folder, filename, folder_order FROM removed_files WHERE session_id = ?",
(session_id,)
).fetchall()
result: dict[int, set[str]] = {}
for folder, filename, folder_order in rows:
if folder_order not in result:
result[folder_order] = set()
result[folder_order].add(filename)
return result
def save_direct_transition(
self,
session_id: int,
after_folder: str,
frame_count: int,
method: str,
enabled: bool,
folder_order: int = 0,
) -> None:
"""Save direct interpolation settings for a folder transition."""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO direct_transition_settings
(session_id, after_folder, frame_count, method, enabled, folder_order)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, folder_order)
DO UPDATE SET after_folder=excluded.after_folder,
frame_count=excluded.frame_count,
method=excluded.method,
enabled=excluded.enabled""",
(session_id, after_folder, frame_count, method, 1 if enabled else 0, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save direct transition: {e}") from e
def get_direct_transitions(self, session_id: int) -> list[tuple[str, int, str, bool, int]]:
"""Get direct interpolation settings for a session.
Returns:
List of (after_folder, frame_count, method, enabled, folder_order) tuples.
"""
with self._connect() as conn:
rows = conn.execute(
"SELECT after_folder, frame_count, method, enabled, folder_order "
"FROM direct_transition_settings WHERE session_id = ?",
(session_id,)
).fetchall()
return [(r[0], r[1], r[2], bool(r[3]), r[4]) for r in rows]
return {
row[0]: PerTransitionSettings(
trans_folder=Path(row[0]),
left_overlap=row[1],
right_overlap=row[2]
)
for row in rows
}

View File

@@ -1,285 +0,0 @@
#!/usr/bin/env python
"""FILM interpolation worker - runs in isolated venv with PyTorch.
This script is executed via subprocess from the main application.
It handles frame interpolation using Google Research's FILM model
(Frame Interpolation for Large Motion) via the frame-interpolation-pytorch repo.
FILM is better than RIFE for large motion and scene gaps, but slower.
Supports two modes:
1. Single frame: --output with --timestep
2. Batch mode: --output-dir with --frame-count (generates all frames at once)
"""
import argparse
import sys
import urllib.request
from pathlib import Path
import numpy as np
import torch
from PIL import Image
# Model download URL
FILM_MODEL_URL = "https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt"
FILM_MODEL_FILENAME = "film_net_fp32.pt"
def load_image(path: Path, device: torch.device) -> torch.Tensor:
"""Load image as tensor.
Args:
path: Path to image file.
device: Device to load tensor to.
Returns:
Image tensor (1, 3, H, W) normalized to [0, 1].
"""
img = Image.open(path).convert('RGB')
arr = np.array(img).astype(np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
return tensor.to(device)
def save_image(tensor: torch.Tensor, path: Path) -> None:
"""Save tensor as image.
Args:
tensor: Image tensor (1, 3, H, W) or (3, H, W) normalized to [0, 1].
path: Output path.
"""
if tensor.dim() == 4:
tensor = tensor.squeeze(0)
arr = tensor.permute(1, 2, 0).cpu().numpy()
arr = (arr * 255).clip(0, 255).astype(np.uint8)
Image.fromarray(arr).save(path)
# Global model cache
_model_cache: dict = {}
def download_model(model_dir: Path) -> Path:
"""Download FILM model if not present.
Args:
model_dir: Directory to store the model.
Returns:
Path to the downloaded model file.
"""
model_dir.mkdir(parents=True, exist_ok=True)
model_path = model_dir / FILM_MODEL_FILENAME
if not model_path.exists():
print(f"Downloading FILM model to {model_path}...", file=sys.stderr)
urllib.request.urlretrieve(FILM_MODEL_URL, model_path)
print("Download complete.", file=sys.stderr)
return model_path
def get_model(model_dir: Path, device: torch.device):
"""Get or load FILM model (cached).
Args:
model_dir: Model cache directory (for model downloads).
device: Device to run on.
Returns:
FILM TorchScript model instance.
"""
cache_key = f"film_{device}"
if cache_key not in _model_cache:
# Download model if needed
model_path = download_model(model_dir)
# Load pre-trained TorchScript model
print(f"Loading FILM model from {model_path}...", file=sys.stderr)
model = torch.jit.load(str(model_path), map_location='cpu')
model.eval()
model.to(device)
_model_cache[cache_key] = model
print("Model loaded.", file=sys.stderr)
return _model_cache[cache_key]
@torch.no_grad()
def interpolate_single(model, img0: torch.Tensor, img1: torch.Tensor, t: float) -> torch.Tensor:
"""Perform single frame interpolation using FILM.
Args:
model: FILM TorchScript model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
t: Interpolation timestep (0.0 to 1.0).
Returns:
Interpolated frame tensor.
"""
# FILM TorchScript model expects dt as tensor of shape (1, 1)
dt = img0.new_full((1, 1), t)
result = model(img0, img1, dt)
if isinstance(result, tuple):
result = result[0]
return result.clamp(0, 1)
@torch.no_grad()
def interpolate_batch(model, img0: torch.Tensor, img1: torch.Tensor, frame_count: int) -> list[torch.Tensor]:
"""Generate multiple interpolated frames using FILM's recursive approach.
FILM works best when generating frames recursively - it first generates
the middle frame, then fills in the gaps. This produces more consistent
results than generating arbitrary timesteps independently.
Args:
model: FILM model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
frame_count: Number of frames to generate between img0 and img1.
Returns:
List of interpolated frame tensors in order.
"""
# Calculate timesteps for evenly spaced frames
timesteps = [(i + 1) / (frame_count + 1) for i in range(frame_count)]
# Try to use the model's batch/recursive interpolation if available
try:
# Some implementations have an interpolate_recursively method
if hasattr(model, 'interpolate_recursively'):
# This generates 2^n - 1 frames, so we need to handle arbitrary counts
results = model.interpolate_recursively(img0, img1, frame_count)
if len(results) >= frame_count:
return results[:frame_count]
except (AttributeError, TypeError):
pass
# Fall back to recursive binary interpolation for better quality
# This mimics FILM's natural recursive approach
frames = {} # timestep -> tensor
def recursive_interpolate(t_left: float, t_right: float, img_left: torch.Tensor, img_right: torch.Tensor, depth: int = 0):
"""Recursively interpolate to fill the gap."""
if depth > 10: # Prevent infinite recursion
return
t_mid = (t_left + t_right) / 2
# Check if we need a frame near t_mid
need_frame = False
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
need_frame = True
break
if not need_frame:
# Check if any remaining timesteps are in this range
remaining = [t for t in timesteps if t not in frames and t_left < t < t_right]
if not remaining:
return
# Generate middle frame
mid_frame = interpolate_single(model, img_left, img_right, 0.5)
# Assign to nearest needed timestep
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
frames[t] = mid_frame
break
# Recurse into left and right halves
recursive_interpolate(t_left, t_mid, img_left, mid_frame, depth + 1)
recursive_interpolate(t_mid, t_right, mid_frame, img_right, depth + 1)
# Start recursive interpolation
recursive_interpolate(0.0, 1.0, img0, img1)
# Fill any remaining timesteps with direct interpolation
for t in timesteps:
if t not in frames:
frames[t] = interpolate_single(model, img0, img1, t)
# Return frames in order
return [frames[t] for t in timesteps]
def main():
parser = argparse.ArgumentParser(description='FILM frame interpolation worker')
parser.add_argument('--input0', required=True, help='Path to first input image')
parser.add_argument('--input1', required=True, help='Path to second input image')
parser.add_argument('--output', help='Path to output image (single frame mode)')
parser.add_argument('--output-dir', help='Output directory (batch mode)')
parser.add_argument('--output-pattern', default='frame_{:04d}.png',
help='Output filename pattern for batch mode')
parser.add_argument('--timestep', type=float, default=0.5,
help='Interpolation timestep 0-1 (single frame mode)')
parser.add_argument('--frame-count', type=int,
help='Number of frames to generate (batch mode)')
parser.add_argument('--repo-dir', help='Unused (kept for backward compat)')
parser.add_argument('--model-dir', required=True, help='Model cache directory')
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
args = parser.parse_args()
# Validate arguments
batch_mode = args.output_dir is not None and args.frame_count is not None
single_mode = args.output is not None
if not batch_mode and not single_mode:
print("Error: Must specify either --output (single) or --output-dir + --frame-count (batch)",
file=sys.stderr)
return 1
try:
# Select device
if args.device == 'cuda' and torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load model
model_dir = Path(args.model_dir)
model = get_model(model_dir, device)
# Load images
img0 = load_image(Path(args.input0), device)
img1 = load_image(Path(args.input1), device)
if batch_mode:
# Batch mode - generate all frames at once
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Generating {args.frame_count} frames...", file=sys.stderr)
frames = interpolate_batch(model, img0, img1, args.frame_count)
for i, frame in enumerate(frames):
output_path = output_dir / args.output_pattern.format(i)
save_image(frame, output_path)
print(f"Saved {output_path.name}", file=sys.stderr)
print(f"Success: Generated {len(frames)} frames", file=sys.stderr)
else:
# Single frame mode
result = interpolate_single(model, img0, img1, args.timestep)
save_image(result, Path(args.output))
print("Success", file=sys.stderr)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -80,12 +80,11 @@ class SymlinkManager:
@staticmethod
def cleanup_old_links(directory: Path) -> int:
"""Remove existing seq* symlinks and temporary files from a directory.
"""Remove existing seq* symlinks from a directory.
Handles all naming formats:
- Old folder-indexed: seq01_0000.png
- Continuous: seq_00000.png
Also removes blended image files and film_temp_*.png temporaries.
Handles both old format (seq_0000) and new format (seq01_0000).
Also removes blended image files (not just symlinks) created by
cross-dissolve transitions.
Args:
directory: Directory to clean up.
@@ -97,134 +96,31 @@ class SymlinkManager:
CleanupError: If cleanup fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
seq_pattern = re.compile(r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE)
try:
for item in directory.iterdir():
should_remove = False
# Match both old (seq_NNNN) and new (seqNN_NNNN) formats
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
item.unlink()
removed += 1
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
# Also remove blended image files
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to clean up old links: {e}") from e
return removed
@staticmethod
def remove_orphan_files(directory: Path, keep_names: set[str]) -> int:
"""Remove seq* files and film_temp_* not in the keep set.
Same pattern matching as cleanup_old_links but skips filenames
present in keep_names.
Args:
directory: Directory to clean orphans from.
keep_names: Set of filenames to keep.
Returns:
Number of files removed.
Raises:
CleanupError: If removal fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
try:
for item in directory.iterdir():
if item.name in keep_names:
continue
should_remove = False
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to remove orphan files: {e}") from e
return removed
@staticmethod
def symlink_matches(link_path: Path, expected_source: Path) -> bool:
"""Check if existing symlink resolves to expected source."""
if not link_path.is_symlink():
return False
try:
return link_path.resolve() == expected_source.resolve()
except OSError:
return False
@staticmethod
def copy_matches(dest_path: Path, source_path: Path) -> bool:
"""Check if existing copy matches source.
Fast path: size + mtime comparison. If sizes match but mtimes
differ, falls back to comparing file contents so that a
re-export after touching (but not changing) the source is still
skipped, while a genuine content change is caught.
"""
if not dest_path.is_file() or dest_path.is_symlink():
return False
try:
src_stat = source_path.stat()
dst_stat = dest_path.stat()
if src_stat.st_size != dst_stat.st_size:
return False
# Fast path: identical mtime means the copy2 wrote this file
if abs(src_stat.st_mtime - dst_stat.st_mtime) < 2.0:
return True
# Size matches but mtime differs — compare contents
return SymlinkManager._files_equal(source_path, dest_path)
except OSError:
return False
@staticmethod
def _files_equal(a: Path, b: Path, chunk_size: int = 65536) -> bool:
"""Compare two files by reading in chunks."""
try:
with open(a, 'rb') as fa, open(b, 'rb') as fb:
while True:
ca = fa.read(chunk_size)
cb = fb.read(chunk_size)
if ca != cb:
return False
if not ca:
return True
except OSError:
return False
def create_sequence_links(
self,
sources: list[Path],
dest: Path,
files: list[tuple],
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
copy_files: bool = False,
) -> tuple[list[LinkResult], Optional[int]]:
"""Create sequenced symlinks or copies from source files to destination.
"""Create sequenced symlinks from source files to destination.
Args:
sources: List of source directories (for validation).
@@ -233,12 +129,12 @@ class SymlinkManager:
- (source_dir, filename) for CLI mode (uses global sequence)
- (source_dir, filename, folder_idx, file_idx) for GUI mode
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
copy_files: If True, copy files instead of creating symlinks.
Returns:
Tuple of (list of LinkResult objects, session_id or None).
"""
self.validate_paths(sources, dest)
self.cleanup_old_links(dest)
session_id = None
if self.db:
@@ -269,13 +165,6 @@ class SymlinkManager:
expanded_files.append((source_dir, filename, folder_idx, file_idx))
files = expanded_files
# Build planned names for orphan removal
planned_names: set[str] = set()
for file_data in files:
_, fn, fi, fli = file_data
ext = Path(fn).suffix
planned_names.add(f"seq{fi + 1:02d}_{fli:04d}{ext}")
for i, file_data in enumerate(files):
source_dir, filename, folder_idx, file_idx = file_data
source_path = source_dir / filename
@@ -283,25 +172,11 @@ class SymlinkManager:
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
link_path = dest / link_name
# Calculate relative path from destination to source
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
try:
# Check if existing file already matches
already_correct = False
if link_path.exists() or link_path.is_symlink():
if copy_files:
already_correct = self.copy_matches(link_path, source_path)
else:
already_correct = self.symlink_matches(link_path, source_path)
if not already_correct:
if link_path.exists() or link_path.is_symlink():
link_path.unlink()
if copy_files:
import shutil
shutil.copy2(source_path, link_path)
else:
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
link_path.symlink_to(rel_source)
link_path.symlink_to(rel_source)
if self.db and session_id:
self.db.record_symlink(
@@ -327,10 +202,4 @@ class SymlinkManager:
error=str(e)
))
# Remove orphan seq*/film_temp_* files not in the planned set
try:
self.remove_orphan_files(dest, planned_names)
except CleanupError:
pass
return results, session_id

View File

@@ -32,12 +32,6 @@ class FolderType(Enum):
TRANSITION = 'transition'
class DirectInterpolationMethod(Enum):
"""Method for direct frame interpolation between sequences."""
RIFE = 'rife'
FILM = 'film'
# --- Data Classes ---
@dataclass
@@ -57,54 +51,14 @@ class TransitionSettings:
# Practical-RIFE settings
practical_rife_model: str = 'v4.25' # v4.25, v4.26, v4.22, etc.
practical_rife_ensemble: bool = False # Ensemble mode for better quality (slower)
# Optical flow settings
of_preset: str = 'balanced' # fast, balanced, quality, max
of_levels: int = 3 # pyramid levels (1-7)
of_winsize: int = 15 # window size (5-51, odd)
of_iterations: int = 3 # iterations (1-10)
of_poly_n: int = 5 # polynomial neighborhood (5 or 7)
of_poly_sigma: float = 1.2 # gaussian sigma (0.5-2.0)
@dataclass
class PerTransitionSettings:
"""Per-transition overlap settings for cross-dissolves."""
"""Per-transition overlap settings for asymmetric cross-dissolves."""
trans_folder: Path
left_overlap: int = 16 # overlap count at left boundary (MAIN→TRANS)
right_overlap: int = 16 # overlap count at right boundary (TRANS→MAIN)
@dataclass
class DirectTransitionSettings:
"""Settings for direct AI interpolation between sequences (no transition folder)."""
after_folder: Path # The folder after which this transition occurs
frame_count: int = 16 # Number of interpolated frames to generate
method: DirectInterpolationMethod = DirectInterpolationMethod.FILM
enabled: bool = True
@dataclass
class VideoPreset:
"""Preset for video encoding via ffmpeg."""
label: str # Display name
container: str # 'mp4' or 'webm'
codec: str # ffmpeg codec: libx264, libx265, libvpx-vp9, libaom-av1
crf: int
pixel_format: str = 'yuv420p'
preset: str = 'medium' # x264/x265 speed preset
max_height: Optional[int] = None # Downscale filter
extra_args: list[str] = field(default_factory=list)
VIDEO_PRESETS: dict[str, VideoPreset] = {
'web_streaming': VideoPreset('Web Streaming', 'mp4', 'libx264', 23, preset='medium'),
'high_quality': VideoPreset('High Quality', 'mp4', 'libx264', 18, preset='slow'),
'archive': VideoPreset('Archive (H.265)', 'mp4', 'libx265', 18, preset='slow', extra_args=['-tag:v', 'hvc1']),
'social_media': VideoPreset('Social Media', 'mp4', 'libx264', 23, preset='fast', max_height=1080),
'fast_preview': VideoPreset('Fast Preview', 'mp4', 'libx264', 28, preset='ultrafast'),
'webm_vp9': VideoPreset('WebM VP9', 'webm', 'libvpx-vp9', 30, extra_args=['-b:v', '0']),
'webm_av1': VideoPreset('WebM AV1', 'webm', 'libaom-av1', 30, extra_args=['-b:v', '0', '-strict', 'experimental']),
'godot_theora': VideoPreset('Godot (Theora)', 'ogv', 'libtheora', 8, extra_args=['-g', '512']),
}
left_overlap: int = 16 # frames from main folder end
right_overlap: int = 16 # frames from trans folder start
@dataclass
@@ -130,9 +84,6 @@ class TransitionSpec:
# Indices into the overall file list
main_start_idx: int
trans_start_idx: int
# Position indices in the folders list (for duplicate folder support)
main_folder_idx: int = 0
trans_folder_idx: int = 0
@dataclass
@@ -164,8 +115,6 @@ class SessionRecord:
created_at: datetime
destination: str
link_count: int = 0
name: Optional[str] = None
locked: bool = False
# --- Exceptions ---

View File

@@ -1,259 +0,0 @@
"""Video encoding utilities wrapping ffmpeg."""
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Callable, Optional
from .models import VideoPreset
def find_ffmpeg() -> Optional[Path]:
"""Find the ffmpeg binary on the system PATH."""
result = shutil.which('ffmpeg')
return Path(result) if result else None
def encode_image_sequence(
input_dir: Path,
output_path: Path,
fps: int,
preset: VideoPreset,
input_pattern: Optional[str] = None,
progress_callback: Optional[Callable[[int, int], bool]] = None,
total_frames: Optional[int] = None,
) -> tuple[bool, str]:
"""Encode an image sequence directory to a video file using ffmpeg.
Args:
input_dir: Directory containing sequentially named image files.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
input_pattern: ffmpeg input pattern (e.g. 'seq_%06d.png').
Auto-detected from first seq_* file if not provided.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
total_frames: Total number of frames for progress reporting.
Auto-counted from input_dir if not provided.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
# Auto-detect input pattern from first seq_* file
if input_pattern is None:
input_pattern = _detect_input_pattern(input_dir)
if input_pattern is None:
return False, f"No seq_* image files found in {input_dir}"
# Auto-count frames
if total_frames is None:
ext = Path(input_pattern).suffix
total_frames = len(list(input_dir.glob(f"seq_*{ext}")))
if total_frames == 0:
return False, f"No matching frames found in {input_dir}"
# Build ffmpeg command
cmd = [
str(ffmpeg), '-y',
'-framerate', str(fps),
'-i', str(input_dir / input_pattern),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
# Add speed preset for x264/x265
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
# Add downscale filter if max_height is set
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
# Add any extra codec-specific args
if preset.extra_args:
cmd += preset.extra_args
# Progress parsing via -progress pipe:1
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
# Clean up partial file
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
def _detect_input_pattern(input_dir: Path) -> Optional[str]:
"""Detect the ffmpeg input pattern from seq_* files in a directory.
Looks for files like seq_000000.png and returns a pattern like seq_%06d.png.
"""
for f in sorted(input_dir.iterdir()):
m = re.match(r'^(seq_)(\d+)(\.\w+)$', f.name)
if m:
prefix = m.group(1)
digits = m.group(2)
ext = m.group(3)
width = len(digits)
return f"{prefix}%0{width}d{ext}"
return None
def encode_from_file_list(
file_paths: list[Path],
output_path: Path,
fps: int,
preset: VideoPreset,
progress_callback: Optional[Callable[[int, int], bool]] = None,
) -> tuple[bool, str]:
"""Encode a video from an explicit list of image file paths.
Uses ffmpeg's concat demuxer so files can be scattered across directories.
Args:
file_paths: Ordered list of image file paths.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
if not file_paths:
return False, "No files provided."
total_frames = len(file_paths)
frame_duration = f"{1.0 / fps:.10f}"
# Write a concat-demuxer file listing each image with its duration
try:
concat_file = tempfile.NamedTemporaryFile(
mode='w', suffix='.txt', delete=False, prefix='vml_concat_'
)
concat_path = Path(concat_file.name)
for p in file_paths:
# Escape single quotes for ffmpeg concat format
escaped = str(p.resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.write(f"duration {frame_duration}\n")
# Repeat last file so the last frame displays for its full duration
escaped = str(file_paths[-1].resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.close()
except OSError as e:
return False, f"Failed to create concat file: {e}"
cmd = [
str(ffmpeg), '-y',
'-f', 'concat', '-safe', '0',
'-i', str(concat_path),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
if preset.extra_args:
cmd += preset.extra_args
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
finally:
try:
concat_path.unlink(missing_ok=True)
except OSError:
pass

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,6 @@ class TrimSlider(QWidget):
"""
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
trimDragFinished = pyqtSignal(int, int, str) # Emits final values on mouse release
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the trim slider.
@@ -288,11 +287,5 @@ class TrimSlider(QWidget):
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
"""Handle mouse release to stop dragging."""
if self._dragging:
handle = self._dragging
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
self.trimDragFinished.emit(self._trim_start, self._trim_end, handle)
else:
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)