Compare commits
25 Commits
d26265a115
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| a6461a2ce8 | |||
| 7c2acd326f | |||
| 30911e894a | |||
| dd426b958b | |||
| 8dda4f56a0 | |||
| 793ba4243d | |||
| 15c00e5fd2 | |||
| 2694a8cba3 | |||
| 6e2d6148af | |||
| 2599265410 | |||
| 1de641d756 | |||
| 9f4b1e17f6 | |||
| cd8878c91b | |||
| f706328e4d | |||
| 893132f110 | |||
| 00e393f7b9 | |||
| 2c4778e598 | |||
| 0efc6e153f | |||
| c03ee0665b | |||
| 82a1c2ff9f | |||
| 78a1c8b795 | |||
| 5defd664ed | |||
| e58dc27dce | |||
| fb67ad5683 | |||
| 8c1de543d3 |
30
README.md
30
README.md
@@ -13,18 +13,20 @@ A PyQt6 application for creating sequenced symlinks from image folders with adva
|
||||
- Per-folder trim settings (exclude frames from start/end)
|
||||
|
||||
### Cross-Dissolve Transitions
|
||||
Smooth blending between folder boundaries with three blend methods:
|
||||
Smooth blending between folder boundaries with four blend methods:
|
||||
|
||||
| Method | Description | Quality | Speed |
|
||||
|--------|-------------|---------|-------|
|
||||
| **Cross-Dissolve** | Simple alpha blend | Good | Fastest |
|
||||
| **Optical Flow** | Motion-compensated blend using OpenCV Farneback | Better | Medium |
|
||||
| **RIFE (AI)** | Neural network frame interpolation | Best | Fast (GPU) |
|
||||
| **RIFE (ncnn)** | Neural network interpolation via rife-ncnn-vulkan | Best | Fast (GPU) |
|
||||
| **RIFE (Practical)** | PyTorch-based Practical-RIFE (v4.25/v4.26) | Best | Medium (GPU) |
|
||||
|
||||
- **Asymmetric overlap**: Set different frame counts for each side of a transition
|
||||
- **Blend curves**: Linear, Ease In, Ease Out, Ease In/Out
|
||||
- **Output formats**: PNG, JPEG (with quality), WebP (lossless with method setting)
|
||||
- **RIFE auto-download**: Automatically downloads rife-ncnn-vulkan binary
|
||||
- **Practical-RIFE models**: Auto-downloads from Google Drive on first use
|
||||
|
||||
### Preview
|
||||
- **Video Preview**: Play video files from source folders
|
||||
@@ -54,11 +56,24 @@ Smooth blending between folder boundaries with three blend methods:
|
||||
pip install PyQt6 Pillow numpy opencv-python
|
||||
```
|
||||
|
||||
### RIFE (Optional)
|
||||
For AI-powered frame interpolation, the app can auto-download [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan) or you can install it manually:
|
||||
- Select **RIFE (AI)** as the blend method
|
||||
- Click **Download** to fetch the latest release
|
||||
**Note:** Practical-RIFE creates its own isolated venv with PyTorch. The `gdown` package is installed automatically for downloading models from Google Drive.
|
||||
|
||||
### RIFE ncnn (Optional)
|
||||
For AI-powered frame interpolation using Vulkan GPU acceleration:
|
||||
- Select **RIFE (ncnn)** as the blend method
|
||||
- Click **Download** to auto-fetch [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan)
|
||||
- Or specify a custom binary path
|
||||
- Models: rife-v4.6, rife-v4.15-lite, etc.
|
||||
|
||||
### Practical-RIFE (Optional)
|
||||
For PyTorch-based frame interpolation with latest models:
|
||||
- Select **RIFE (Practical)** as the blend method
|
||||
- Click **Setup PyTorch** to create an isolated venv with PyTorch (~2GB)
|
||||
- Models auto-download from Google Drive on first use
|
||||
- Available models: v4.26, v4.25, v4.22, v4.20, v4.18, v4.15
|
||||
- Optional ensemble mode for higher quality (slower)
|
||||
|
||||
The venv is stored at `~/.cache/video-montage-linker/venv-rife/`
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -98,7 +113,8 @@ video-montage-linker/
|
||||
├── core/
|
||||
│ ├── models.py # Enums, dataclasses
|
||||
│ ├── database.py # SQLite session management
|
||||
│ ├── blender.py # Image blending, RIFE downloader
|
||||
│ ├── blender.py # Image blending, RIFE downloader, Practical-RIFE env
|
||||
│ ├── rife_worker.py # Practical-RIFE inference (runs in isolated venv)
|
||||
│ └── manager.py # Symlink operations
|
||||
└── ui/
|
||||
├── widgets.py # TrimSlider, custom widgets
|
||||
|
||||
@@ -4,8 +4,12 @@ from .models import (
|
||||
BlendCurve,
|
||||
BlendMethod,
|
||||
FolderType,
|
||||
DirectInterpolationMethod,
|
||||
TransitionSettings,
|
||||
PerTransitionSettings,
|
||||
DirectTransitionSettings,
|
||||
VideoPreset,
|
||||
VIDEO_PRESETS,
|
||||
BlendResult,
|
||||
TransitionSpec,
|
||||
LinkResult,
|
||||
@@ -19,15 +23,20 @@ from .models import (
|
||||
DatabaseError,
|
||||
)
|
||||
from .database import DatabaseManager
|
||||
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv
|
||||
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv, FilmEnv, OPTICAL_FLOW_PRESETS
|
||||
from .manager import SymlinkManager
|
||||
from .video import encode_image_sequence, encode_from_file_list, find_ffmpeg
|
||||
|
||||
__all__ = [
|
||||
'BlendCurve',
|
||||
'BlendMethod',
|
||||
'FolderType',
|
||||
'DirectInterpolationMethod',
|
||||
'TransitionSettings',
|
||||
'PerTransitionSettings',
|
||||
'DirectTransitionSettings',
|
||||
'VideoPreset',
|
||||
'VIDEO_PRESETS',
|
||||
'BlendResult',
|
||||
'TransitionSpec',
|
||||
'LinkResult',
|
||||
@@ -44,5 +53,10 @@ __all__ = [
|
||||
'TransitionGenerator',
|
||||
'RifeDownloader',
|
||||
'PracticalRifeEnv',
|
||||
'FilmEnv',
|
||||
'SymlinkManager',
|
||||
'OPTICAL_FLOW_PRESETS',
|
||||
'encode_image_sequence',
|
||||
'encode_from_file_list',
|
||||
'find_ffmpeg',
|
||||
]
|
||||
|
||||
699
core/blender.py
699
core/blender.py
@@ -23,6 +23,8 @@ from .models import (
|
||||
PerTransitionSettings,
|
||||
BlendResult,
|
||||
TransitionSpec,
|
||||
DirectInterpolationMethod,
|
||||
DirectTransitionSettings,
|
||||
)
|
||||
|
||||
|
||||
@@ -31,6 +33,14 @@ CACHE_DIR = Path.home() / '.cache' / 'video-montage-linker'
|
||||
RIFE_GITHUB_API = 'https://api.github.com/repos/nihui/rife-ncnn-vulkan/releases/latest'
|
||||
PRACTICAL_RIFE_VENV_DIR = CACHE_DIR / 'venv-rife'
|
||||
|
||||
# Optical flow presets
|
||||
OPTICAL_FLOW_PRESETS = {
|
||||
'fast': {'levels': 2, 'winsize': 11, 'iterations': 2, 'poly_n': 5, 'poly_sigma': 1.1},
|
||||
'balanced': {'levels': 3, 'winsize': 15, 'iterations': 3, 'poly_n': 5, 'poly_sigma': 1.2},
|
||||
'quality': {'levels': 5, 'winsize': 21, 'iterations': 5, 'poly_n': 7, 'poly_sigma': 1.5},
|
||||
'max': {'levels': 7, 'winsize': 31, 'iterations': 10, 'poly_n': 7, 'poly_sigma': 1.5},
|
||||
}
|
||||
|
||||
|
||||
class PracticalRifeEnv:
|
||||
"""Manages isolated Python environment for Practical-RIFE."""
|
||||
@@ -243,6 +253,230 @@ class PracticalRifeEnv:
|
||||
return False, str(e)
|
||||
|
||||
|
||||
class FilmEnv:
|
||||
"""Manages FILM frame interpolation using shared venv with RIFE."""
|
||||
|
||||
VENV_DIR = PRACTICAL_RIFE_VENV_DIR # Share venv with RIFE
|
||||
MODEL_CACHE_DIR = CACHE_DIR / 'film'
|
||||
MODEL_FILENAME = 'film_net_fp32.pt'
|
||||
MODEL_URL = 'https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt'
|
||||
|
||||
# Keep REPO_DIR for backward compat (but unused now - model is downloaded directly)
|
||||
REPO_DIR = CACHE_DIR / 'frame-interpolation-pytorch'
|
||||
|
||||
@classmethod
|
||||
def get_venv_python(cls) -> Optional[Path]:
|
||||
"""Get path to venv Python executable."""
|
||||
if cls.VENV_DIR.exists():
|
||||
if sys.platform == 'win32':
|
||||
return cls.VENV_DIR / 'Scripts' / 'python.exe'
|
||||
return cls.VENV_DIR / 'bin' / 'python'
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_model_path(cls) -> Path:
|
||||
"""Get path to the FILM TorchScript model."""
|
||||
return cls.MODEL_CACHE_DIR / cls.MODEL_FILENAME
|
||||
|
||||
@classmethod
|
||||
def is_setup(cls) -> bool:
|
||||
"""Check if venv exists and FILM model is downloaded."""
|
||||
python = cls.get_venv_python()
|
||||
if not python or not python.exists():
|
||||
return False
|
||||
# Check if model is downloaded
|
||||
return cls.get_model_path().exists()
|
||||
|
||||
@classmethod
|
||||
def setup_film(cls, progress_callback=None, cancelled_check=None) -> bool:
|
||||
"""Download FILM model and ensure venv is ready.
|
||||
|
||||
Args:
|
||||
progress_callback: Optional callback(message, percent) for progress.
|
||||
cancelled_check: Optional callable that returns True if cancelled.
|
||||
|
||||
Returns:
|
||||
True if setup was successful.
|
||||
"""
|
||||
python = cls.get_venv_python()
|
||||
if not python or not python.exists():
|
||||
# Need to set up base venv first via PracticalRifeEnv
|
||||
return False
|
||||
|
||||
try:
|
||||
model_path = cls.get_model_path()
|
||||
|
||||
if not model_path.exists():
|
||||
if progress_callback:
|
||||
progress_callback("Downloading FILM model (~380MB)...", 30)
|
||||
if cancelled_check and cancelled_check():
|
||||
return False
|
||||
|
||||
# Download the pre-trained TorchScript model
|
||||
cls.MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
urllib.request.urlretrieve(cls.MODEL_URL, model_path)
|
||||
|
||||
if progress_callback:
|
||||
progress_callback("FILM setup complete!", 100)
|
||||
|
||||
return cls.is_setup()
|
||||
|
||||
except Exception as e:
|
||||
print(f"[FILM] Setup error: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_worker_script(cls) -> Path:
|
||||
"""Get path to the FILM worker script."""
|
||||
return Path(__file__).parent / 'film_worker.py'
|
||||
|
||||
@classmethod
|
||||
def run_interpolation(
|
||||
cls,
|
||||
img_a_path: Path,
|
||||
img_b_path: Path,
|
||||
output_path: Path,
|
||||
t: float
|
||||
) -> tuple[bool, str]:
|
||||
"""Run FILM interpolation via subprocess in venv.
|
||||
|
||||
Args:
|
||||
img_a_path: Path to first input image.
|
||||
img_b_path: Path to second input image.
|
||||
output_path: Path to output image.
|
||||
t: Timestep for interpolation (0.0 to 1.0).
|
||||
|
||||
Returns:
|
||||
Tuple of (success, error_message).
|
||||
"""
|
||||
python = cls.get_venv_python()
|
||||
if not python or not python.exists():
|
||||
return False, "venv python not found"
|
||||
|
||||
script = cls.get_worker_script()
|
||||
if not script.exists():
|
||||
return False, f"worker script not found: {script}"
|
||||
|
||||
cmd = [
|
||||
str(python), str(script),
|
||||
'--input0', str(img_a_path),
|
||||
'--input1', str(img_b_path),
|
||||
'--output', str(output_path),
|
||||
'--timestep', str(t),
|
||||
'--repo-dir', str(cls.REPO_DIR),
|
||||
'--model-dir', str(cls.MODEL_CACHE_DIR)
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=180 # 3 minute timeout per frame (FILM is slower)
|
||||
)
|
||||
if result.returncode == 0 and output_path.exists():
|
||||
return True, ""
|
||||
else:
|
||||
error = result.stderr.strip() if result.stderr else f"returncode={result.returncode}"
|
||||
return False, error
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "timeout (180s)"
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
@classmethod
|
||||
def run_batch_interpolation(
|
||||
cls,
|
||||
img_a_path: Path,
|
||||
img_b_path: Path,
|
||||
output_dir: Path,
|
||||
frame_count: int,
|
||||
output_pattern: str = 'frame_{:04d}.png'
|
||||
) -> tuple[bool, str, list[Path]]:
|
||||
"""Run FILM batch interpolation via subprocess in venv.
|
||||
|
||||
Generates all frames at once using FILM's recursive approach,
|
||||
which produces better results than generating frames independently.
|
||||
|
||||
Args:
|
||||
img_a_path: Path to first input image.
|
||||
img_b_path: Path to second input image.
|
||||
output_dir: Directory to save output frames.
|
||||
frame_count: Number of frames to generate.
|
||||
output_pattern: Filename pattern for output frames.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, error_message, list_of_output_paths).
|
||||
"""
|
||||
python = cls.get_venv_python()
|
||||
if not python or not python.exists():
|
||||
return False, "venv python not found", []
|
||||
|
||||
script = cls.get_worker_script()
|
||||
if not script.exists():
|
||||
return False, f"worker script not found: {script}", []
|
||||
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
cmd = [
|
||||
str(python), str(script),
|
||||
'--input0', str(img_a_path),
|
||||
'--input1', str(img_b_path),
|
||||
'--output-dir', str(output_dir),
|
||||
'--frame-count', str(frame_count),
|
||||
'--output-pattern', output_pattern,
|
||||
'--repo-dir', str(cls.REPO_DIR),
|
||||
'--model-dir', str(cls.MODEL_CACHE_DIR)
|
||||
]
|
||||
|
||||
try:
|
||||
# Longer timeout for batch - scale with frame count
|
||||
timeout = max(300, frame_count * 30) # At least 5 min, +30s per frame
|
||||
|
||||
print(f"[FILM] Running batch interpolation: {frame_count} frames", file=sys.stderr)
|
||||
print(f"[FILM] Command: {' '.join(cmd)}", file=sys.stderr)
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
# Collect output paths
|
||||
output_paths = [
|
||||
output_dir / output_pattern.format(i)
|
||||
for i in range(frame_count)
|
||||
]
|
||||
existing_paths = [p for p in output_paths if p.exists()]
|
||||
|
||||
if result.returncode == 0 and len(existing_paths) == frame_count:
|
||||
print(f"[FILM] Success: generated {len(existing_paths)} frames", file=sys.stderr)
|
||||
return True, "", output_paths
|
||||
else:
|
||||
# Combine stdout and stderr for better error reporting
|
||||
error_parts = []
|
||||
if result.returncode != 0:
|
||||
error_parts.append(f"returncode={result.returncode}")
|
||||
if result.stdout and result.stdout.strip():
|
||||
error_parts.append(f"stdout: {result.stdout.strip()}")
|
||||
if result.stderr and result.stderr.strip():
|
||||
error_parts.append(f"stderr: {result.stderr.strip()}")
|
||||
if len(existing_paths) != frame_count:
|
||||
error_parts.append(f"expected {frame_count} frames, got {len(existing_paths)}")
|
||||
|
||||
error = "; ".join(error_parts) if error_parts else "unknown error"
|
||||
print(f"[FILM] Failed: {error}", file=sys.stderr)
|
||||
return False, error, existing_paths
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
print(f"[FILM] Timeout after {timeout}s", file=sys.stderr)
|
||||
return False, f"timeout ({timeout}s)", []
|
||||
except Exception as e:
|
||||
print(f"[FILM] Exception: {e}", file=sys.stderr)
|
||||
return False, str(e), []
|
||||
|
||||
|
||||
class RifeDownloader:
|
||||
"""Handles automatic download and caching of rife-ncnn-vulkan binary."""
|
||||
|
||||
@@ -541,7 +775,16 @@ class ImageBlender:
|
||||
return Image.blend(frames[lower_idx], frames[upper_idx], frac)
|
||||
|
||||
@staticmethod
|
||||
def optical_flow_blend(img_a: Image.Image, img_b: Image.Image, t: float) -> Image.Image:
|
||||
def optical_flow_blend(
|
||||
img_a: Image.Image,
|
||||
img_b: Image.Image,
|
||||
t: float,
|
||||
levels: int = 3,
|
||||
winsize: int = 15,
|
||||
iterations: int = 3,
|
||||
poly_n: int = 5,
|
||||
poly_sigma: float = 1.2
|
||||
) -> Image.Image:
|
||||
"""Blend using OpenCV optical flow for motion compensation.
|
||||
|
||||
Uses Farneback dense optical flow to warp frames and reduce ghosting
|
||||
@@ -551,6 +794,11 @@ class ImageBlender:
|
||||
img_a: First PIL Image (source frame).
|
||||
img_b: Second PIL Image (target frame).
|
||||
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
|
||||
levels: Pyramid levels for optical flow (1-7).
|
||||
winsize: Window size for optical flow (5-51, odd).
|
||||
iterations: Number of iterations (1-10).
|
||||
poly_n: Polynomial neighborhood size (5 or 7).
|
||||
poly_sigma: Gaussian sigma for polynomial expansion (0.5-2.0).
|
||||
|
||||
Returns:
|
||||
Motion-compensated blended PIL Image.
|
||||
@@ -571,11 +819,11 @@ class ImageBlender:
|
||||
flow = cv2.calcOpticalFlowFarneback(
|
||||
gray_a, gray_b, None,
|
||||
pyr_scale=0.5,
|
||||
levels=3,
|
||||
winsize=15,
|
||||
iterations=3,
|
||||
poly_n=5,
|
||||
poly_sigma=1.2,
|
||||
levels=levels,
|
||||
winsize=winsize,
|
||||
iterations=iterations,
|
||||
poly_n=poly_n,
|
||||
poly_sigma=poly_sigma,
|
||||
flags=0
|
||||
)
|
||||
|
||||
@@ -802,6 +1050,56 @@ class ImageBlender:
|
||||
# Fall back to ncnn RIFE or optical flow
|
||||
return ImageBlender.rife_blend(img_a, img_b, t)
|
||||
|
||||
@staticmethod
|
||||
def film_blend(
|
||||
img_a: Image.Image,
|
||||
img_b: Image.Image,
|
||||
t: float
|
||||
) -> Image.Image:
|
||||
"""Blend using FILM for large motion interpolation.
|
||||
|
||||
FILM (Frame Interpolation for Large Motion) is Google Research's
|
||||
high-quality frame interpolation model, better for large motion.
|
||||
|
||||
Args:
|
||||
img_a: First PIL Image (source frame).
|
||||
img_b: Second PIL Image (target frame).
|
||||
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
|
||||
|
||||
Returns:
|
||||
AI-interpolated blended PIL Image.
|
||||
"""
|
||||
if not FilmEnv.is_setup():
|
||||
print("[FILM] Not set up, falling back to Practical-RIFE", file=sys.stderr)
|
||||
return ImageBlender.practical_rife_blend(img_a, img_b, t)
|
||||
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp = Path(tmpdir)
|
||||
input_a = tmp / 'a.png'
|
||||
input_b = tmp / 'b.png'
|
||||
output_file = tmp / 'out.png'
|
||||
|
||||
# Save input images
|
||||
img_a.convert('RGB').save(input_a)
|
||||
img_b.convert('RGB').save(input_b)
|
||||
|
||||
# Run FILM via subprocess
|
||||
success, error_msg = FilmEnv.run_interpolation(
|
||||
input_a, input_b, output_file, t
|
||||
)
|
||||
|
||||
if success and output_file.exists():
|
||||
return Image.open(output_file).copy()
|
||||
else:
|
||||
print(f"[FILM] Interpolation failed: {error_msg}, falling back to Practical-RIFE", file=sys.stderr)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[FILM] Exception: {e}, falling back to Practical-RIFE", file=sys.stderr)
|
||||
|
||||
# Fall back to Practical-RIFE
|
||||
return ImageBlender.practical_rife_blend(img_a, img_b, t)
|
||||
|
||||
@staticmethod
|
||||
def blend_images(
|
||||
img_a_path: Path,
|
||||
@@ -817,7 +1115,12 @@ class ImageBlender:
|
||||
rife_uhd: bool = False,
|
||||
rife_tta: bool = False,
|
||||
practical_rife_model: str = 'v4.25',
|
||||
practical_rife_ensemble: bool = False
|
||||
practical_rife_ensemble: bool = False,
|
||||
of_levels: int = 3,
|
||||
of_winsize: int = 15,
|
||||
of_iterations: int = 3,
|
||||
of_poly_n: int = 5,
|
||||
of_poly_sigma: float = 1.2
|
||||
) -> BlendResult:
|
||||
"""Blend two images together.
|
||||
|
||||
@@ -836,6 +1139,11 @@ class ImageBlender:
|
||||
rife_tta: Enable RIFE ncnn TTA mode.
|
||||
practical_rife_model: Practical-RIFE model version (e.g., 'v4.25').
|
||||
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
|
||||
of_levels: Optical flow pyramid levels (1-7).
|
||||
of_winsize: Optical flow window size (5-51, odd).
|
||||
of_iterations: Optical flow iterations (1-10).
|
||||
of_poly_n: Optical flow polynomial neighborhood (5 or 7).
|
||||
of_poly_sigma: Optical flow gaussian sigma (0.5-2.0).
|
||||
|
||||
Returns:
|
||||
BlendResult with operation status.
|
||||
@@ -856,7 +1164,14 @@ class ImageBlender:
|
||||
|
||||
# Blend images using selected method
|
||||
if blend_method == BlendMethod.OPTICAL_FLOW:
|
||||
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
|
||||
blended = ImageBlender.optical_flow_blend(
|
||||
img_a, img_b, factor,
|
||||
levels=of_levels,
|
||||
winsize=of_winsize,
|
||||
iterations=of_iterations,
|
||||
poly_n=of_poly_n,
|
||||
poly_sigma=of_poly_sigma
|
||||
)
|
||||
elif blend_method == BlendMethod.RIFE:
|
||||
blended = ImageBlender.rife_blend(
|
||||
img_a, img_b, factor, rife_binary_path, True, rife_model, rife_uhd, rife_tta
|
||||
@@ -922,7 +1237,12 @@ class ImageBlender:
|
||||
rife_uhd: bool = False,
|
||||
rife_tta: bool = False,
|
||||
practical_rife_model: str = 'v4.25',
|
||||
practical_rife_ensemble: bool = False
|
||||
practical_rife_ensemble: bool = False,
|
||||
of_levels: int = 3,
|
||||
of_winsize: int = 15,
|
||||
of_iterations: int = 3,
|
||||
of_poly_n: int = 5,
|
||||
of_poly_sigma: float = 1.2
|
||||
) -> BlendResult:
|
||||
"""Blend two PIL Image objects together.
|
||||
|
||||
@@ -941,6 +1261,11 @@ class ImageBlender:
|
||||
rife_tta: Enable RIFE ncnn TTA mode.
|
||||
practical_rife_model: Practical-RIFE model version (e.g., 'v4.25').
|
||||
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
|
||||
of_levels: Optical flow pyramid levels (1-7).
|
||||
of_winsize: Optical flow window size (5-51, odd).
|
||||
of_iterations: Optical flow iterations (1-10).
|
||||
of_poly_n: Optical flow polynomial neighborhood (5 or 7).
|
||||
of_poly_sigma: Optical flow gaussian sigma (0.5-2.0).
|
||||
|
||||
Returns:
|
||||
BlendResult with operation status.
|
||||
@@ -958,7 +1283,14 @@ class ImageBlender:
|
||||
|
||||
# Blend images using selected method
|
||||
if blend_method == BlendMethod.OPTICAL_FLOW:
|
||||
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
|
||||
blended = ImageBlender.optical_flow_blend(
|
||||
img_a, img_b, factor,
|
||||
levels=of_levels,
|
||||
winsize=of_winsize,
|
||||
iterations=of_iterations,
|
||||
poly_n=of_poly_n,
|
||||
poly_sigma=of_poly_sigma
|
||||
)
|
||||
elif blend_method == BlendMethod.RIFE:
|
||||
blended = ImageBlender.rife_blend(
|
||||
img_a, img_b, factor, rife_binary_path, True, rife_model, rife_uhd, rife_tta
|
||||
@@ -1024,21 +1356,19 @@ class TransitionGenerator:
|
||||
def get_folder_type(
|
||||
self,
|
||||
index: int,
|
||||
overrides: Optional[dict[Path, FolderType]] = None,
|
||||
folder: Optional[Path] = None
|
||||
overrides: Optional[dict[int, FolderType]] = None,
|
||||
) -> FolderType:
|
||||
"""Determine folder type based on position or override.
|
||||
|
||||
Args:
|
||||
index: 0-based position of folder in list.
|
||||
overrides: Optional dict of folder path to FolderType overrides.
|
||||
folder: The folder path for checking overrides.
|
||||
overrides: Optional dict of position index to FolderType overrides.
|
||||
|
||||
Returns:
|
||||
FolderType.MAIN for odd positions (1, 3, 5...), TRANSITION for even.
|
||||
FolderType.MAIN for even positions (0, 2, 4...), TRANSITION for odd.
|
||||
"""
|
||||
if overrides and folder and folder in overrides:
|
||||
override = overrides[folder]
|
||||
if overrides and index in overrides:
|
||||
override = overrides[index]
|
||||
if override != FolderType.AUTO:
|
||||
return override
|
||||
|
||||
@@ -1048,9 +1378,9 @@ class TransitionGenerator:
|
||||
def identify_transition_boundaries(
|
||||
self,
|
||||
folders: list[Path],
|
||||
files_by_folder: dict[Path, list[str]],
|
||||
folder_overrides: Optional[dict[Path, FolderType]] = None,
|
||||
per_transition_settings: Optional[dict[Path, PerTransitionSettings]] = None
|
||||
files_by_idx: dict[int, list[str]],
|
||||
folder_overrides: Optional[dict[int, FolderType]] = None,
|
||||
per_transition_settings: Optional[dict[int, PerTransitionSettings]] = None
|
||||
) -> list[TransitionSpec]:
|
||||
"""Identify boundaries where transitions should occur.
|
||||
|
||||
@@ -1059,9 +1389,9 @@ class TransitionGenerator:
|
||||
|
||||
Args:
|
||||
folders: List of folders in order.
|
||||
files_by_folder: Dict mapping folders to their file lists.
|
||||
folder_overrides: Optional folder type overrides.
|
||||
per_transition_settings: Optional per-transition overlap settings.
|
||||
files_by_idx: Dict mapping position index to file lists.
|
||||
folder_overrides: Optional position-index-keyed folder type overrides.
|
||||
per_transition_settings: Optional position-index-keyed per-transition overlap settings.
|
||||
|
||||
Returns:
|
||||
List of TransitionSpec objects describing each transition.
|
||||
@@ -1071,47 +1401,67 @@ class TransitionGenerator:
|
||||
|
||||
transitions = []
|
||||
cumulative_idx = 0
|
||||
folder_start_indices = {}
|
||||
folder_start_indices: dict[int, int] = {}
|
||||
|
||||
# Calculate start indices for each folder
|
||||
for folder in folders:
|
||||
folder_start_indices[folder] = cumulative_idx
|
||||
cumulative_idx += len(files_by_folder.get(folder, []))
|
||||
# Calculate start indices for each folder position
|
||||
for i in range(len(folders)):
|
||||
folder_start_indices[i] = cumulative_idx
|
||||
cumulative_idx += len(files_by_idx.get(i, []))
|
||||
|
||||
# Track how many files are committed from each folder's start and end
|
||||
# so overlaps never exceed available frames.
|
||||
committed_from_start: dict[int, int] = {} # folder idx → frames used from start
|
||||
committed_from_end: dict[int, int] = {} # folder idx → frames used from end
|
||||
|
||||
# Look for transition boundaries (MAIN->TRANSITION and TRANSITION->MAIN)
|
||||
for i in range(len(folders) - 1):
|
||||
folder_a = folders[i]
|
||||
folder_b = folders[i + 1]
|
||||
|
||||
type_a = self.get_folder_type(i, folder_overrides, folder_a)
|
||||
type_b = self.get_folder_type(i + 1, folder_overrides, folder_b)
|
||||
type_a = self.get_folder_type(i, folder_overrides)
|
||||
type_b = self.get_folder_type(i + 1, folder_overrides)
|
||||
|
||||
# Create transition when types differ (MAIN->TRANS or TRANS->MAIN)
|
||||
if type_a != type_b:
|
||||
files_a = files_by_folder.get(folder_a, [])
|
||||
files_b = files_by_folder.get(folder_b, [])
|
||||
files_a = files_by_idx.get(i, [])
|
||||
files_b = files_by_idx.get(i + 1, [])
|
||||
|
||||
if not files_a or not files_b:
|
||||
continue
|
||||
|
||||
# Get per-transition overlap settings if available
|
||||
# Use folder_b as the key (the "incoming" folder)
|
||||
if per_transition_settings and folder_b in per_transition_settings:
|
||||
pts = per_transition_settings[folder_b]
|
||||
left_overlap = pts.left_overlap
|
||||
right_overlap = pts.right_overlap
|
||||
# Get per-transition overlap settings from the TRANSITION folder
|
||||
# (could be at position i or i+1 depending on boundary direction)
|
||||
pts_key = i if type_a == FolderType.TRANSITION else i + 1
|
||||
if per_transition_settings and pts_key in per_transition_settings:
|
||||
pts = per_transition_settings[pts_key]
|
||||
if type_a == FolderType.TRANSITION:
|
||||
# TRANS→MAIN boundary: use right_overlap (right boundary count)
|
||||
left_overlap = pts.right_overlap
|
||||
right_overlap = pts.right_overlap
|
||||
else:
|
||||
# MAIN→TRANS boundary: use left_overlap (left boundary count)
|
||||
left_overlap = pts.left_overlap
|
||||
right_overlap = pts.left_overlap
|
||||
else:
|
||||
# Use default of 16 for both
|
||||
left_overlap = 16
|
||||
right_overlap = 16
|
||||
|
||||
# Cap overlaps by available files
|
||||
left_overlap = min(left_overlap, len(files_a))
|
||||
right_overlap = min(right_overlap, len(files_b))
|
||||
# Cap overlaps by available files, accounting for frames
|
||||
# already committed to a prior boundary on the same folder.
|
||||
# Keep both sides equal (symmetric) after capping.
|
||||
avail_a = len(files_a) - committed_from_start.get(i, 0)
|
||||
avail_b = len(files_b) - committed_from_end.get(i + 1, 0)
|
||||
capped = min(left_overlap, right_overlap, avail_a, avail_b)
|
||||
left_overlap = capped
|
||||
right_overlap = capped
|
||||
|
||||
if left_overlap < 1 or right_overlap < 1:
|
||||
continue
|
||||
|
||||
committed_from_end[i] = committed_from_end.get(i, 0) + left_overlap
|
||||
committed_from_start[i + 1] = committed_from_start.get(i + 1, 0) + right_overlap
|
||||
|
||||
transitions.append(TransitionSpec(
|
||||
main_folder=folder_a,
|
||||
trans_folder=folder_b,
|
||||
@@ -1119,8 +1469,10 @@ class TransitionGenerator:
|
||||
trans_files=files_b,
|
||||
left_overlap=left_overlap,
|
||||
right_overlap=right_overlap,
|
||||
main_start_idx=folder_start_indices[folder_a],
|
||||
trans_start_idx=folder_start_indices[folder_b]
|
||||
main_start_idx=folder_start_indices[i],
|
||||
trans_start_idx=folder_start_indices[i + 1],
|
||||
main_folder_idx=i,
|
||||
trans_folder_idx=i + 1,
|
||||
))
|
||||
|
||||
return transitions
|
||||
@@ -1130,7 +1482,7 @@ class TransitionGenerator:
|
||||
spec: TransitionSpec,
|
||||
dest: Path,
|
||||
folder_idx_main: int,
|
||||
base_file_idx: int
|
||||
base_seq_num: int
|
||||
) -> list[BlendResult]:
|
||||
"""Generate blended frames for an asymmetric transition.
|
||||
|
||||
@@ -1141,8 +1493,8 @@ class TransitionGenerator:
|
||||
Args:
|
||||
spec: TransitionSpec describing the transition.
|
||||
dest: Destination directory for blended frames.
|
||||
folder_idx_main: Folder index for sequence naming.
|
||||
base_file_idx: Starting file index for sequence naming.
|
||||
folder_idx_main: Folder index (unused, kept for compatibility).
|
||||
base_seq_num: Starting sequence number for continuous naming.
|
||||
|
||||
Returns:
|
||||
List of BlendResult objects.
|
||||
@@ -1197,8 +1549,8 @@ class TransitionGenerator:
|
||||
|
||||
# Generate output filename
|
||||
ext = f".{self.settings.output_format.lower()}"
|
||||
file_idx = base_file_idx + i
|
||||
output_name = f"seq{folder_idx_main + 1:02d}_{file_idx:04d}{ext}"
|
||||
seq_num = base_seq_num + i
|
||||
output_name = f"seq_{seq_num:05d}{ext}"
|
||||
output_path = dest / output_name
|
||||
|
||||
result = self.blender.blend_images_pil(
|
||||
@@ -1215,7 +1567,12 @@ class TransitionGenerator:
|
||||
self.settings.rife_uhd,
|
||||
self.settings.rife_tta,
|
||||
self.settings.practical_rife_model,
|
||||
self.settings.practical_rife_ensemble
|
||||
self.settings.practical_rife_ensemble,
|
||||
self.settings.of_levels,
|
||||
self.settings.of_winsize,
|
||||
self.settings.of_iterations,
|
||||
self.settings.of_poly_n,
|
||||
self.settings.of_poly_sigma
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
@@ -1232,7 +1589,7 @@ class TransitionGenerator:
|
||||
spec: TransitionSpec,
|
||||
dest: Path,
|
||||
folder_idx_main: int,
|
||||
base_file_idx: int
|
||||
base_seq_num: int
|
||||
) -> list[BlendResult]:
|
||||
"""Generate blended frames for a transition.
|
||||
|
||||
@@ -1241,13 +1598,249 @@ class TransitionGenerator:
|
||||
Args:
|
||||
spec: TransitionSpec describing the transition.
|
||||
dest: Destination directory for blended frames.
|
||||
folder_idx_main: Folder index for sequence naming.
|
||||
base_file_idx: Starting file index for sequence naming.
|
||||
folder_idx_main: Folder index (unused, kept for compatibility).
|
||||
base_seq_num: Starting sequence number for continuous naming.
|
||||
|
||||
Returns:
|
||||
List of BlendResult objects.
|
||||
"""
|
||||
# Use asymmetric blend for all cases (handles symmetric too)
|
||||
return self.generate_asymmetric_blend_frames(
|
||||
spec, dest, folder_idx_main, base_file_idx
|
||||
spec, dest, folder_idx_main, base_seq_num
|
||||
)
|
||||
|
||||
def generate_direct_interpolation_frames(
|
||||
self,
|
||||
img_a_path: Path,
|
||||
img_b_path: Path,
|
||||
frame_count: int,
|
||||
method: DirectInterpolationMethod,
|
||||
dest: Path,
|
||||
folder_idx: int,
|
||||
base_seq_num: int,
|
||||
practical_rife_model: str = 'v4.25',
|
||||
practical_rife_ensemble: bool = False
|
||||
) -> list[BlendResult]:
|
||||
"""Generate AI-interpolated frames between two images.
|
||||
|
||||
Used for direct transitions between MAIN sequences without
|
||||
a transition folder.
|
||||
|
||||
For FILM: Uses batch mode to generate all frames at once (better quality).
|
||||
For RIFE: Generates frames one at a time (RIFE handles arbitrary timesteps well).
|
||||
|
||||
Args:
|
||||
img_a_path: Path to last frame of first sequence.
|
||||
img_b_path: Path to first frame of second sequence.
|
||||
frame_count: Number of interpolated frames to generate.
|
||||
method: Interpolation method (RIFE or FILM).
|
||||
dest: Destination directory for generated frames.
|
||||
folder_idx: Folder index (unused, kept for compatibility).
|
||||
base_seq_num: Starting sequence number for continuous naming.
|
||||
practical_rife_model: Practical-RIFE model version.
|
||||
practical_rife_ensemble: Enable Practical-RIFE ensemble mode.
|
||||
|
||||
Returns:
|
||||
List of BlendResult objects.
|
||||
"""
|
||||
results = []
|
||||
dest.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# For FILM, use batch mode to generate all frames at once
|
||||
if method == DirectInterpolationMethod.FILM and FilmEnv.is_setup():
|
||||
return self._generate_film_frames_batch(
|
||||
img_a_path, img_b_path, frame_count, dest, base_seq_num
|
||||
)
|
||||
|
||||
# For RIFE (or FILM fallback), generate frames one at a time
|
||||
# Load source images
|
||||
img_a = Image.open(img_a_path)
|
||||
img_b = Image.open(img_b_path)
|
||||
|
||||
# Handle different sizes - resize B to match A
|
||||
if img_a.size != img_b.size:
|
||||
img_b = img_b.resize(img_a.size, Image.Resampling.LANCZOS)
|
||||
|
||||
# Normalize to RGBA
|
||||
if img_a.mode != 'RGBA':
|
||||
img_a = img_a.convert('RGBA')
|
||||
if img_b.mode != 'RGBA':
|
||||
img_b = img_b.convert('RGBA')
|
||||
|
||||
for i in range(frame_count):
|
||||
# Evenly space t values between 0 and 1 (exclusive)
|
||||
t = (i + 1) / (frame_count + 1)
|
||||
|
||||
# Generate interpolated frame
|
||||
if method == DirectInterpolationMethod.FILM:
|
||||
blended = ImageBlender.film_blend(img_a, img_b, t)
|
||||
else: # RIFE
|
||||
blended = ImageBlender.practical_rife_blend(
|
||||
img_a, img_b, t,
|
||||
practical_rife_model, practical_rife_ensemble
|
||||
)
|
||||
|
||||
# Generate output filename
|
||||
ext = f".{self.settings.output_format.lower()}"
|
||||
seq_num = base_seq_num + i
|
||||
output_name = f"seq_{seq_num:05d}{ext}"
|
||||
output_path = dest / output_name
|
||||
|
||||
# Save the blended frame
|
||||
try:
|
||||
# Convert back to RGB if saving to JPEG
|
||||
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
|
||||
blended = blended.convert('RGB')
|
||||
|
||||
# Save with appropriate options
|
||||
save_kwargs = {}
|
||||
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
|
||||
save_kwargs['quality'] = self.settings.output_quality
|
||||
elif self.settings.output_format.lower() == 'webp':
|
||||
save_kwargs['lossless'] = True
|
||||
save_kwargs['method'] = self.settings.webp_method
|
||||
elif self.settings.output_format.lower() == 'png':
|
||||
save_kwargs['compress_level'] = 6
|
||||
|
||||
blended.save(output_path, **save_kwargs)
|
||||
|
||||
results.append(BlendResult(
|
||||
output_path=output_path,
|
||||
source_a=img_a_path,
|
||||
source_b=img_b_path,
|
||||
blend_factor=t,
|
||||
success=True
|
||||
))
|
||||
except Exception as e:
|
||||
results.append(BlendResult(
|
||||
output_path=output_path,
|
||||
source_a=img_a_path,
|
||||
source_b=img_b_path,
|
||||
blend_factor=t,
|
||||
success=False,
|
||||
error=str(e)
|
||||
))
|
||||
|
||||
# Close loaded images
|
||||
img_a.close()
|
||||
img_b.close()
|
||||
|
||||
return results
|
||||
|
||||
def _generate_film_frames_batch(
|
||||
self,
|
||||
img_a_path: Path,
|
||||
img_b_path: Path,
|
||||
frame_count: int,
|
||||
dest: Path,
|
||||
base_seq_num: int
|
||||
) -> list[BlendResult]:
|
||||
"""Generate FILM frames using batch mode for better quality.
|
||||
|
||||
FILM works best when generating all frames at once using its
|
||||
recursive approach, rather than generating arbitrary timesteps.
|
||||
|
||||
Args:
|
||||
img_a_path: Path to last frame of first sequence.
|
||||
img_b_path: Path to first frame of second sequence.
|
||||
frame_count: Number of interpolated frames to generate.
|
||||
dest: Destination directory for generated frames.
|
||||
base_seq_num: Starting sequence number for continuous naming.
|
||||
|
||||
Returns:
|
||||
List of BlendResult objects.
|
||||
"""
|
||||
results = []
|
||||
|
||||
# Generate frames using FILM batch mode
|
||||
# Use a temp pattern, then rename to final names
|
||||
temp_pattern = 'film_temp_{:04d}.png'
|
||||
|
||||
success, error, temp_paths = FilmEnv.run_batch_interpolation(
|
||||
img_a_path,
|
||||
img_b_path,
|
||||
dest,
|
||||
frame_count,
|
||||
temp_pattern
|
||||
)
|
||||
|
||||
if not success:
|
||||
# Return error results for all frames
|
||||
for i in range(frame_count):
|
||||
t = (i + 1) / (frame_count + 1)
|
||||
ext = f".{self.settings.output_format.lower()}"
|
||||
seq_num = base_seq_num + i
|
||||
output_name = f"seq_{seq_num:05d}{ext}"
|
||||
output_path = dest / output_name
|
||||
|
||||
results.append(BlendResult(
|
||||
output_path=output_path,
|
||||
source_a=img_a_path,
|
||||
source_b=img_b_path,
|
||||
blend_factor=t,
|
||||
success=False,
|
||||
error=error
|
||||
))
|
||||
return results
|
||||
|
||||
# Rename temp files to final names and convert format if needed
|
||||
for i, temp_path in enumerate(temp_paths):
|
||||
t = (i + 1) / (frame_count + 1)
|
||||
ext = f".{self.settings.output_format.lower()}"
|
||||
seq_num = base_seq_num + i
|
||||
output_name = f"seq_{seq_num:05d}{ext}"
|
||||
output_path = dest / output_name
|
||||
|
||||
try:
|
||||
if temp_path.exists():
|
||||
# Load the temp frame
|
||||
frame = Image.open(temp_path)
|
||||
|
||||
# Convert format if needed
|
||||
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
|
||||
frame = frame.convert('RGB')
|
||||
|
||||
# Save with appropriate options
|
||||
save_kwargs = {}
|
||||
if self.settings.output_format.lower() in ('jpg', 'jpeg'):
|
||||
save_kwargs['quality'] = self.settings.output_quality
|
||||
elif self.settings.output_format.lower() == 'webp':
|
||||
save_kwargs['lossless'] = True
|
||||
save_kwargs['method'] = self.settings.webp_method
|
||||
elif self.settings.output_format.lower() == 'png':
|
||||
save_kwargs['compress_level'] = 6
|
||||
|
||||
frame.save(output_path, **save_kwargs)
|
||||
frame.close()
|
||||
|
||||
# Remove temp file if different from output
|
||||
if temp_path != output_path:
|
||||
temp_path.unlink(missing_ok=True)
|
||||
|
||||
results.append(BlendResult(
|
||||
output_path=output_path,
|
||||
source_a=img_a_path,
|
||||
source_b=img_b_path,
|
||||
blend_factor=t,
|
||||
success=True
|
||||
))
|
||||
else:
|
||||
results.append(BlendResult(
|
||||
output_path=output_path,
|
||||
source_a=img_a_path,
|
||||
source_b=img_b_path,
|
||||
blend_factor=t,
|
||||
success=False,
|
||||
error=f"Temp file not found: {temp_path}"
|
||||
))
|
||||
except Exception as e:
|
||||
results.append(BlendResult(
|
||||
output_path=output_path,
|
||||
source_a=img_a_path,
|
||||
source_b=img_b_path,
|
||||
blend_factor=t,
|
||||
success=False,
|
||||
error=str(e)
|
||||
))
|
||||
|
||||
return results
|
||||
|
||||
453
core/database.py
453
core/database.py
@@ -39,7 +39,8 @@ class DatabaseManager:
|
||||
CREATE TABLE IF NOT EXISTS symlink_sessions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
destination TEXT NOT NULL
|
||||
destination TEXT NOT NULL,
|
||||
name TEXT DEFAULT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS symlinks (
|
||||
@@ -82,6 +83,24 @@ class DatabaseManager:
|
||||
right_overlap INTEGER DEFAULT 16,
|
||||
UNIQUE(session_id, trans_folder)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS removed_files (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
|
||||
source_folder TEXT NOT NULL,
|
||||
filename TEXT NOT NULL,
|
||||
UNIQUE(session_id, source_folder, filename)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS direct_transition_settings (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
|
||||
after_folder TEXT NOT NULL,
|
||||
frame_count INTEGER DEFAULT 16,
|
||||
method TEXT DEFAULT 'film',
|
||||
enabled INTEGER DEFAULT 1,
|
||||
UNIQUE(session_id, after_folder)
|
||||
);
|
||||
""")
|
||||
|
||||
# Migration: add folder_type column if it doesn't exist
|
||||
@@ -114,20 +133,168 @@ class DatabaseManager:
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE transition_settings ADD COLUMN rife_binary_path TEXT")
|
||||
|
||||
# Migration: add folder_order column if it doesn't exist
|
||||
try:
|
||||
conn.execute("SELECT folder_order FROM sequence_trim_settings LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE sequence_trim_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
|
||||
|
||||
# Migration: add name column to symlink_sessions if it doesn't exist
|
||||
try:
|
||||
conn.execute("SELECT name FROM symlink_sessions LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN name TEXT DEFAULT NULL")
|
||||
|
||||
# Migration: widen UNIQUE constraints to allow duplicate folder paths per session.
|
||||
# sequence_trim_settings: UNIQUE(session_id, source_folder) → UNIQUE(session_id, folder_order)
|
||||
self._migrate_unique_constraint(
|
||||
conn, 'sequence_trim_settings',
|
||||
"""CREATE TABLE sequence_trim_settings_new (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
|
||||
source_folder TEXT NOT NULL,
|
||||
trim_start INTEGER DEFAULT 0,
|
||||
trim_end INTEGER DEFAULT 0,
|
||||
folder_type TEXT DEFAULT 'auto',
|
||||
folder_order INTEGER DEFAULT 0,
|
||||
UNIQUE(session_id, folder_order)
|
||||
)""",
|
||||
'session_id, source_folder, trim_start, trim_end, folder_type, folder_order',
|
||||
)
|
||||
|
||||
# per_transition_settings: add folder_order, widen UNIQUE
|
||||
try:
|
||||
conn.execute("SELECT folder_order FROM per_transition_settings LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE per_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
|
||||
self._migrate_unique_constraint(
|
||||
conn, 'per_transition_settings',
|
||||
"""CREATE TABLE per_transition_settings_new (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
|
||||
trans_folder TEXT NOT NULL,
|
||||
left_overlap INTEGER DEFAULT 16,
|
||||
right_overlap INTEGER DEFAULT 16,
|
||||
folder_order INTEGER DEFAULT 0,
|
||||
UNIQUE(session_id, trans_folder, folder_order)
|
||||
)""",
|
||||
'session_id, trans_folder, left_overlap, right_overlap, folder_order',
|
||||
)
|
||||
|
||||
# removed_files: add folder_order, widen UNIQUE
|
||||
try:
|
||||
conn.execute("SELECT folder_order FROM removed_files LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE removed_files ADD COLUMN folder_order INTEGER DEFAULT 0")
|
||||
self._migrate_unique_constraint(
|
||||
conn, 'removed_files',
|
||||
"""CREATE TABLE removed_files_new (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
|
||||
source_folder TEXT NOT NULL,
|
||||
filename TEXT NOT NULL,
|
||||
folder_order INTEGER DEFAULT 0,
|
||||
UNIQUE(session_id, source_folder, filename, folder_order)
|
||||
)""",
|
||||
'session_id, source_folder, filename, folder_order',
|
||||
)
|
||||
|
||||
# direct_transition_settings: add folder_order, widen UNIQUE
|
||||
try:
|
||||
conn.execute("SELECT folder_order FROM direct_transition_settings LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE direct_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
|
||||
self._migrate_unique_constraint(
|
||||
conn, 'direct_transition_settings',
|
||||
"""CREATE TABLE direct_transition_settings_new (
|
||||
id INTEGER PRIMARY KEY,
|
||||
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
|
||||
after_folder TEXT NOT NULL,
|
||||
frame_count INTEGER DEFAULT 16,
|
||||
method TEXT DEFAULT 'film',
|
||||
enabled INTEGER DEFAULT 1,
|
||||
folder_order INTEGER DEFAULT 0,
|
||||
UNIQUE(session_id, after_folder, folder_order)
|
||||
)""",
|
||||
'session_id, after_folder, frame_count, method, enabled, folder_order',
|
||||
)
|
||||
|
||||
# Migration: add locked column to symlink_sessions
|
||||
try:
|
||||
conn.execute("SELECT locked FROM symlink_sessions LIMIT 1")
|
||||
except sqlite3.OperationalError:
|
||||
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN locked INTEGER DEFAULT 0")
|
||||
|
||||
# Migration: remove overlap_frames from transition_settings (now per-transition)
|
||||
# We'll keep it for backward compatibility but won't use it
|
||||
|
||||
@staticmethod
|
||||
def _migrate_unique_constraint(
|
||||
conn: sqlite3.Connection,
|
||||
table: str,
|
||||
create_new_sql: str,
|
||||
columns: str,
|
||||
) -> None:
|
||||
"""Recreate a table with a new UNIQUE constraint if needed.
|
||||
|
||||
Tests whether duplicate folder_order=0 entries can be inserted.
|
||||
If an IntegrityError fires, the old constraint is too narrow and
|
||||
the table must be recreated.
|
||||
"""
|
||||
new_table = f"{table}_new"
|
||||
try:
|
||||
# Test: can we insert two rows with same session+folder but different folder_order?
|
||||
# If the old UNIQUE is still (session_id, source_folder) this will fail.
|
||||
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 1)")
|
||||
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 2)")
|
||||
# Clean up test rows
|
||||
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
|
||||
# If we got here, the constraint already allows duplicates — no migration needed
|
||||
return
|
||||
except sqlite3.IntegrityError:
|
||||
# Old constraint is too narrow — need to recreate
|
||||
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
|
||||
except sqlite3.OperationalError:
|
||||
# Column might not exist yet or other issue — try migration anyway
|
||||
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
|
||||
|
||||
try:
|
||||
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
|
||||
conn.execute(create_new_sql)
|
||||
conn.execute(f"INSERT INTO {new_table} ({columns}) SELECT {columns} FROM {table}")
|
||||
conn.execute(f"DROP TABLE {table}")
|
||||
conn.execute(f"ALTER TABLE {new_table} RENAME TO {table}")
|
||||
except (sqlite3.OperationalError, sqlite3.IntegrityError):
|
||||
# Clean up failed migration attempt
|
||||
try:
|
||||
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
|
||||
def clear_session_data(self, session_id: int) -> None:
|
||||
"""Delete all data for a session (symlinks, settings, etc.) but keep the session row."""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
for table in (
|
||||
'symlinks', 'sequence_trim_settings', 'transition_settings',
|
||||
'per_transition_settings', 'removed_files', 'direct_transition_settings',
|
||||
):
|
||||
conn.execute(f"DELETE FROM {table} WHERE session_id = ?", (session_id,))
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to clear session data: {e}") from e
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
"""Create a database connection with foreign keys enabled."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
return conn
|
||||
|
||||
def create_session(self, destination: str) -> int:
|
||||
def create_session(self, destination: str, name: Optional[str] = None) -> int:
|
||||
"""Create a new linking session.
|
||||
|
||||
Args:
|
||||
destination: The destination directory path.
|
||||
name: Optional display name (e.g. "autosave").
|
||||
|
||||
Returns:
|
||||
The ID of the created session.
|
||||
@@ -138,8 +305,8 @@ class DatabaseManager:
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
cursor = conn.execute(
|
||||
"INSERT INTO symlink_sessions (destination) VALUES (?)",
|
||||
(destination,)
|
||||
"INSERT INTO symlink_sessions (destination, name) VALUES (?, ?)",
|
||||
(destination, name)
|
||||
)
|
||||
return cursor.lastrowid
|
||||
except sqlite3.Error as e:
|
||||
@@ -180,6 +347,31 @@ class DatabaseManager:
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to record symlink: {e}") from e
|
||||
|
||||
def record_symlinks_batch(
|
||||
self,
|
||||
session_id: int,
|
||||
records: list[tuple[str, str, str, int]],
|
||||
) -> None:
|
||||
"""Record multiple symlinks in a single transaction.
|
||||
|
||||
Args:
|
||||
session_id: The session these symlinks belong to.
|
||||
records: List of (source, link, filename, seq) tuples.
|
||||
|
||||
Raises:
|
||||
DatabaseError: If recording fails.
|
||||
"""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.executemany(
|
||||
"""INSERT INTO symlinks
|
||||
(session_id, source_path, link_path, original_filename, sequence_number)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
[(session_id, src, lnk, fname, seq) for src, lnk, fname, seq in records]
|
||||
)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to record symlinks: {e}") from e
|
||||
|
||||
def get_sessions(self) -> list[SessionRecord]:
|
||||
"""List all sessions with link counts.
|
||||
|
||||
@@ -188,7 +380,8 @@ class DatabaseManager:
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
|
||||
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
|
||||
s.name, COALESCE(s.locked, 0)
|
||||
FROM symlink_sessions s
|
||||
LEFT JOIN symlinks l ON s.id = l.session_id
|
||||
GROUP BY s.id
|
||||
@@ -200,7 +393,9 @@ class DatabaseManager:
|
||||
id=row[0],
|
||||
created_at=datetime.fromisoformat(row[1]),
|
||||
destination=row[2],
|
||||
link_count=row[3]
|
||||
link_count=row[3],
|
||||
name=row[4],
|
||||
locked=bool(row[5])
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
@@ -270,7 +465,7 @@ class DatabaseManager:
|
||||
]
|
||||
|
||||
def delete_session(self, session_id: int) -> None:
|
||||
"""Delete a session and all its symlink records.
|
||||
"""Delete a session and all its related data (CASCADE handles child tables).
|
||||
|
||||
Args:
|
||||
session_id: The session ID to delete.
|
||||
@@ -280,11 +475,56 @@ class DatabaseManager:
|
||||
"""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.execute("DELETE FROM symlinks WHERE session_id = ?", (session_id,))
|
||||
conn.execute("DELETE FROM symlink_sessions WHERE id = ?", (session_id,))
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to delete session: {e}") from e
|
||||
|
||||
def delete_sessions(self, session_ids: list[int]) -> None:
|
||||
"""Delete multiple sessions in a single transaction.
|
||||
|
||||
Locked sessions are silently skipped.
|
||||
|
||||
Args:
|
||||
session_ids: List of session IDs to delete.
|
||||
|
||||
Raises:
|
||||
DatabaseError: If deletion fails.
|
||||
"""
|
||||
if not session_ids:
|
||||
return
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
placeholders = ','.join('?' for _ in session_ids)
|
||||
conn.execute(
|
||||
f"DELETE FROM symlink_sessions WHERE id IN ({placeholders}) AND COALESCE(locked, 0) = 0",
|
||||
session_ids
|
||||
)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to delete sessions: {e}") from e
|
||||
|
||||
def toggle_session_locked(self, session_id: int) -> bool:
|
||||
"""Toggle the locked state of a session.
|
||||
|
||||
Returns:
|
||||
The new locked state.
|
||||
"""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT COALESCE(locked, 0) FROM symlink_sessions WHERE id = ?",
|
||||
(session_id,)
|
||||
).fetchone()
|
||||
if row is None:
|
||||
raise DatabaseError(f"Session {session_id} not found")
|
||||
new_val = 0 if row[0] else 1
|
||||
conn.execute(
|
||||
"UPDATE symlink_sessions SET locked = ? WHERE id = ?",
|
||||
(new_val, session_id)
|
||||
)
|
||||
return bool(new_val)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to toggle session lock: {e}") from e
|
||||
|
||||
def get_sessions_by_destination(self, dest: str) -> list[SessionRecord]:
|
||||
"""Get all sessions for a destination directory.
|
||||
|
||||
@@ -296,7 +536,8 @@ class DatabaseManager:
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
|
||||
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
|
||||
s.name, COALESCE(s.locked, 0)
|
||||
FROM symlink_sessions s
|
||||
LEFT JOIN symlinks l ON s.id = l.session_id
|
||||
WHERE s.destination = ?
|
||||
@@ -309,7 +550,9 @@ class DatabaseManager:
|
||||
id=row[0],
|
||||
created_at=datetime.fromisoformat(row[1]),
|
||||
destination=row[2],
|
||||
link_count=row[3]
|
||||
link_count=row[3],
|
||||
name=row[4],
|
||||
locked=bool(row[5])
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
@@ -320,7 +563,8 @@ class DatabaseManager:
|
||||
source_folder: str,
|
||||
trim_start: int,
|
||||
trim_end: int,
|
||||
folder_type: FolderType = FolderType.AUTO
|
||||
folder_type: FolderType = FolderType.AUTO,
|
||||
folder_order: int = 0,
|
||||
) -> None:
|
||||
"""Save trim settings for a folder in a session.
|
||||
|
||||
@@ -330,6 +574,7 @@ class DatabaseManager:
|
||||
trim_start: Number of images to trim from start.
|
||||
trim_end: Number of images to trim from end.
|
||||
folder_type: The folder type (auto, main, or transition).
|
||||
folder_order: Position of this folder in source_folders list.
|
||||
|
||||
Raises:
|
||||
DatabaseError: If saving fails.
|
||||
@@ -338,13 +583,14 @@ class DatabaseManager:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""INSERT INTO sequence_trim_settings
|
||||
(session_id, source_folder, trim_start, trim_end, folder_type)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(session_id, source_folder)
|
||||
DO UPDATE SET trim_start=excluded.trim_start,
|
||||
(session_id, source_folder, trim_start, trim_end, folder_type, folder_order)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(session_id, folder_order)
|
||||
DO UPDATE SET source_folder=excluded.source_folder,
|
||||
trim_start=excluded.trim_start,
|
||||
trim_end=excluded.trim_end,
|
||||
folder_type=excluded.folder_type""",
|
||||
(session_id, source_folder, trim_start, trim_end, folder_type.value)
|
||||
(session_id, source_folder, trim_start, trim_end, folder_type.value, folder_order)
|
||||
)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to save trim settings: {e}") from e
|
||||
@@ -396,6 +642,62 @@ class DatabaseManager:
|
||||
|
||||
return {row[0]: (row[1], row[2]) for row in rows}
|
||||
|
||||
def get_all_folder_settings(self, session_id: int) -> dict[str, tuple[int, int, FolderType]]:
|
||||
"""Get all folder settings (trim + type) for a session, unordered.
|
||||
|
||||
Returns:
|
||||
Dict mapping source_folder to (trim_start, trim_end, folder_type).
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"""SELECT source_folder, trim_start, trim_end, folder_type
|
||||
FROM sequence_trim_settings WHERE session_id = ?""",
|
||||
(session_id,)
|
||||
).fetchall()
|
||||
|
||||
result = {}
|
||||
for row in rows:
|
||||
try:
|
||||
ft = FolderType(row[3]) if row[3] else FolderType.AUTO
|
||||
except ValueError:
|
||||
ft = FolderType.AUTO
|
||||
result[row[0]] = (row[1], row[2], ft)
|
||||
return result
|
||||
|
||||
def get_ordered_folders(self, session_id: int) -> list[tuple[str, FolderType, int, int]]:
|
||||
"""Get all folders for a session in saved order.
|
||||
|
||||
Returns:
|
||||
List of (source_folder, folder_type, trim_start, trim_end) sorted by folder_order.
|
||||
Returns empty list if folder_order is not meaningful (all zeros from
|
||||
pre-migration sessions), so the caller falls back to symlink-derived order.
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"""SELECT source_folder, folder_type, trim_start, trim_end, folder_order
|
||||
FROM sequence_trim_settings WHERE session_id = ?
|
||||
ORDER BY folder_order""",
|
||||
(session_id,)
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
# If all folder_order values are 0, this is a pre-migration session
|
||||
# where the ordering is not meaningful — return empty to trigger
|
||||
# the legacy symlink-derived ordering path.
|
||||
if len(rows) > 1 and all(row[4] == 0 for row in rows):
|
||||
return []
|
||||
|
||||
result = []
|
||||
for row in rows:
|
||||
try:
|
||||
ft = FolderType(row[1]) if row[1] else FolderType.AUTO
|
||||
except ValueError:
|
||||
ft = FolderType.AUTO
|
||||
result.append((row[0], ft, row[2], row[3]))
|
||||
return result
|
||||
|
||||
def save_transition_settings(
|
||||
self,
|
||||
session_id: int,
|
||||
@@ -532,13 +834,15 @@ class DatabaseManager:
|
||||
def save_per_transition_settings(
|
||||
self,
|
||||
session_id: int,
|
||||
settings: PerTransitionSettings
|
||||
settings: PerTransitionSettings,
|
||||
folder_order: int = 0,
|
||||
) -> None:
|
||||
"""Save per-transition overlap settings.
|
||||
|
||||
Args:
|
||||
session_id: The session ID.
|
||||
settings: PerTransitionSettings to save.
|
||||
folder_order: Position of this folder in the source list.
|
||||
|
||||
Raises:
|
||||
DatabaseError: If saving fails.
|
||||
@@ -547,13 +851,13 @@ class DatabaseManager:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""INSERT INTO per_transition_settings
|
||||
(session_id, trans_folder, left_overlap, right_overlap)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(session_id, trans_folder)
|
||||
(session_id, trans_folder, left_overlap, right_overlap, folder_order)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(session_id, trans_folder, folder_order)
|
||||
DO UPDATE SET left_overlap=excluded.left_overlap,
|
||||
right_overlap=excluded.right_overlap""",
|
||||
(session_id, str(settings.trans_folder),
|
||||
settings.left_overlap, settings.right_overlap)
|
||||
settings.left_overlap, settings.right_overlap, folder_order)
|
||||
)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to save per-transition settings: {e}") from e
|
||||
@@ -590,27 +894,110 @@ class DatabaseManager:
|
||||
def get_all_per_transition_settings(
|
||||
self,
|
||||
session_id: int
|
||||
) -> dict[str, PerTransitionSettings]:
|
||||
) -> list[tuple[str, int, int, int]]:
|
||||
"""Get all per-transition settings for a session.
|
||||
|
||||
Args:
|
||||
session_id: The session ID.
|
||||
|
||||
Returns:
|
||||
Dict mapping transition folder paths to PerTransitionSettings.
|
||||
List of (trans_folder, left_overlap, right_overlap, folder_order) tuples.
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"""SELECT trans_folder, left_overlap, right_overlap
|
||||
FROM per_transition_settings WHERE session_id = ?""",
|
||||
"""SELECT trans_folder, left_overlap, right_overlap, folder_order
|
||||
FROM per_transition_settings WHERE session_id = ?
|
||||
ORDER BY folder_order""",
|
||||
(session_id,)
|
||||
).fetchall()
|
||||
|
||||
return {
|
||||
row[0]: PerTransitionSettings(
|
||||
trans_folder=Path(row[0]),
|
||||
left_overlap=row[1],
|
||||
right_overlap=row[2]
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
return [(row[0], row[1], row[2], row[3]) for row in rows]
|
||||
|
||||
def save_removed_files(
|
||||
self,
|
||||
session_id: int,
|
||||
source_folder: str,
|
||||
filenames: list[str],
|
||||
folder_order: int = 0,
|
||||
) -> None:
|
||||
"""Save removed files for a folder in a session.
|
||||
|
||||
Args:
|
||||
session_id: The session ID.
|
||||
source_folder: Path to the source folder.
|
||||
filenames: List of removed filenames.
|
||||
folder_order: Position of this folder in the source list.
|
||||
"""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
for filename in filenames:
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO removed_files
|
||||
(session_id, source_folder, filename, folder_order)
|
||||
VALUES (?, ?, ?, ?)""",
|
||||
(session_id, source_folder, filename, folder_order)
|
||||
)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to save removed files: {e}") from e
|
||||
|
||||
def get_removed_files(self, session_id: int) -> dict[int, set[str]]:
|
||||
"""Get all removed files for a session, keyed by folder_order.
|
||||
|
||||
Args:
|
||||
session_id: The session ID.
|
||||
|
||||
Returns:
|
||||
Dict mapping folder_order to sets of removed filenames.
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT source_folder, filename, folder_order FROM removed_files WHERE session_id = ?",
|
||||
(session_id,)
|
||||
).fetchall()
|
||||
|
||||
result: dict[int, set[str]] = {}
|
||||
for folder, filename, folder_order in rows:
|
||||
if folder_order not in result:
|
||||
result[folder_order] = set()
|
||||
result[folder_order].add(filename)
|
||||
return result
|
||||
|
||||
def save_direct_transition(
|
||||
self,
|
||||
session_id: int,
|
||||
after_folder: str,
|
||||
frame_count: int,
|
||||
method: str,
|
||||
enabled: bool,
|
||||
folder_order: int = 0,
|
||||
) -> None:
|
||||
"""Save direct interpolation settings for a folder transition."""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""INSERT INTO direct_transition_settings
|
||||
(session_id, after_folder, frame_count, method, enabled, folder_order)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(session_id, folder_order)
|
||||
DO UPDATE SET after_folder=excluded.after_folder,
|
||||
frame_count=excluded.frame_count,
|
||||
method=excluded.method,
|
||||
enabled=excluded.enabled""",
|
||||
(session_id, after_folder, frame_count, method, 1 if enabled else 0, folder_order)
|
||||
)
|
||||
except sqlite3.Error as e:
|
||||
raise DatabaseError(f"Failed to save direct transition: {e}") from e
|
||||
|
||||
def get_direct_transitions(self, session_id: int) -> list[tuple[str, int, str, bool, int]]:
|
||||
"""Get direct interpolation settings for a session.
|
||||
|
||||
Returns:
|
||||
List of (after_folder, frame_count, method, enabled, folder_order) tuples.
|
||||
"""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT after_folder, frame_count, method, enabled, folder_order "
|
||||
"FROM direct_transition_settings WHERE session_id = ?",
|
||||
(session_id,)
|
||||
).fetchall()
|
||||
return [(r[0], r[1], r[2], bool(r[3]), r[4]) for r in rows]
|
||||
|
||||
285
core/film_worker.py
Normal file
285
core/film_worker.py
Normal file
@@ -0,0 +1,285 @@
|
||||
#!/usr/bin/env python
|
||||
"""FILM interpolation worker - runs in isolated venv with PyTorch.
|
||||
|
||||
This script is executed via subprocess from the main application.
|
||||
It handles frame interpolation using Google Research's FILM model
|
||||
(Frame Interpolation for Large Motion) via the frame-interpolation-pytorch repo.
|
||||
|
||||
FILM is better than RIFE for large motion and scene gaps, but slower.
|
||||
|
||||
Supports two modes:
|
||||
1. Single frame: --output with --timestep
|
||||
2. Batch mode: --output-dir with --frame-count (generates all frames at once)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from PIL import Image
|
||||
|
||||
# Model download URL
|
||||
FILM_MODEL_URL = "https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt"
|
||||
FILM_MODEL_FILENAME = "film_net_fp32.pt"
|
||||
|
||||
|
||||
def load_image(path: Path, device: torch.device) -> torch.Tensor:
|
||||
"""Load image as tensor.
|
||||
|
||||
Args:
|
||||
path: Path to image file.
|
||||
device: Device to load tensor to.
|
||||
|
||||
Returns:
|
||||
Image tensor (1, 3, H, W) normalized to [0, 1].
|
||||
"""
|
||||
img = Image.open(path).convert('RGB')
|
||||
arr = np.array(img).astype(np.float32) / 255.0
|
||||
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
|
||||
return tensor.to(device)
|
||||
|
||||
|
||||
def save_image(tensor: torch.Tensor, path: Path) -> None:
|
||||
"""Save tensor as image.
|
||||
|
||||
Args:
|
||||
tensor: Image tensor (1, 3, H, W) or (3, H, W) normalized to [0, 1].
|
||||
path: Output path.
|
||||
"""
|
||||
if tensor.dim() == 4:
|
||||
tensor = tensor.squeeze(0)
|
||||
arr = tensor.permute(1, 2, 0).cpu().numpy()
|
||||
arr = (arr * 255).clip(0, 255).astype(np.uint8)
|
||||
Image.fromarray(arr).save(path)
|
||||
|
||||
|
||||
# Global model cache
|
||||
_model_cache: dict = {}
|
||||
|
||||
|
||||
def download_model(model_dir: Path) -> Path:
|
||||
"""Download FILM model if not present.
|
||||
|
||||
Args:
|
||||
model_dir: Directory to store the model.
|
||||
|
||||
Returns:
|
||||
Path to the downloaded model file.
|
||||
"""
|
||||
model_dir.mkdir(parents=True, exist_ok=True)
|
||||
model_path = model_dir / FILM_MODEL_FILENAME
|
||||
|
||||
if not model_path.exists():
|
||||
print(f"Downloading FILM model to {model_path}...", file=sys.stderr)
|
||||
urllib.request.urlretrieve(FILM_MODEL_URL, model_path)
|
||||
print("Download complete.", file=sys.stderr)
|
||||
|
||||
return model_path
|
||||
|
||||
|
||||
def get_model(model_dir: Path, device: torch.device):
|
||||
"""Get or load FILM model (cached).
|
||||
|
||||
Args:
|
||||
model_dir: Model cache directory (for model downloads).
|
||||
device: Device to run on.
|
||||
|
||||
Returns:
|
||||
FILM TorchScript model instance.
|
||||
"""
|
||||
cache_key = f"film_{device}"
|
||||
if cache_key not in _model_cache:
|
||||
# Download model if needed
|
||||
model_path = download_model(model_dir)
|
||||
|
||||
# Load pre-trained TorchScript model
|
||||
print(f"Loading FILM model from {model_path}...", file=sys.stderr)
|
||||
model = torch.jit.load(str(model_path), map_location='cpu')
|
||||
model.eval()
|
||||
model.to(device)
|
||||
_model_cache[cache_key] = model
|
||||
print("Model loaded.", file=sys.stderr)
|
||||
|
||||
return _model_cache[cache_key]
|
||||
|
||||
|
||||
@torch.no_grad()
|
||||
def interpolate_single(model, img0: torch.Tensor, img1: torch.Tensor, t: float) -> torch.Tensor:
|
||||
"""Perform single frame interpolation using FILM.
|
||||
|
||||
Args:
|
||||
model: FILM TorchScript model instance.
|
||||
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
|
||||
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
|
||||
t: Interpolation timestep (0.0 to 1.0).
|
||||
|
||||
Returns:
|
||||
Interpolated frame tensor.
|
||||
"""
|
||||
# FILM TorchScript model expects dt as tensor of shape (1, 1)
|
||||
dt = img0.new_full((1, 1), t)
|
||||
|
||||
result = model(img0, img1, dt)
|
||||
|
||||
if isinstance(result, tuple):
|
||||
result = result[0]
|
||||
|
||||
return result.clamp(0, 1)
|
||||
|
||||
|
||||
@torch.no_grad()
|
||||
def interpolate_batch(model, img0: torch.Tensor, img1: torch.Tensor, frame_count: int) -> list[torch.Tensor]:
|
||||
"""Generate multiple interpolated frames using FILM's recursive approach.
|
||||
|
||||
FILM works best when generating frames recursively - it first generates
|
||||
the middle frame, then fills in the gaps. This produces more consistent
|
||||
results than generating arbitrary timesteps independently.
|
||||
|
||||
Args:
|
||||
model: FILM model instance.
|
||||
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
|
||||
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
|
||||
frame_count: Number of frames to generate between img0 and img1.
|
||||
|
||||
Returns:
|
||||
List of interpolated frame tensors in order.
|
||||
"""
|
||||
# Calculate timesteps for evenly spaced frames
|
||||
timesteps = [(i + 1) / (frame_count + 1) for i in range(frame_count)]
|
||||
|
||||
# Try to use the model's batch/recursive interpolation if available
|
||||
try:
|
||||
# Some implementations have an interpolate_recursively method
|
||||
if hasattr(model, 'interpolate_recursively'):
|
||||
# This generates 2^n - 1 frames, so we need to handle arbitrary counts
|
||||
results = model.interpolate_recursively(img0, img1, frame_count)
|
||||
if len(results) >= frame_count:
|
||||
return results[:frame_count]
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
|
||||
# Fall back to recursive binary interpolation for better quality
|
||||
# This mimics FILM's natural recursive approach
|
||||
frames = {} # timestep -> tensor
|
||||
|
||||
def recursive_interpolate(t_left: float, t_right: float, img_left: torch.Tensor, img_right: torch.Tensor, depth: int = 0):
|
||||
"""Recursively interpolate to fill the gap."""
|
||||
if depth > 10: # Prevent infinite recursion
|
||||
return
|
||||
|
||||
t_mid = (t_left + t_right) / 2
|
||||
|
||||
# Check if we need a frame near t_mid
|
||||
need_frame = False
|
||||
for t in timesteps:
|
||||
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
|
||||
need_frame = True
|
||||
break
|
||||
|
||||
if not need_frame:
|
||||
# Check if any remaining timesteps are in this range
|
||||
remaining = [t for t in timesteps if t not in frames and t_left < t < t_right]
|
||||
if not remaining:
|
||||
return
|
||||
|
||||
# Generate middle frame
|
||||
mid_frame = interpolate_single(model, img_left, img_right, 0.5)
|
||||
|
||||
# Assign to nearest needed timestep
|
||||
for t in timesteps:
|
||||
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
|
||||
frames[t] = mid_frame
|
||||
break
|
||||
|
||||
# Recurse into left and right halves
|
||||
recursive_interpolate(t_left, t_mid, img_left, mid_frame, depth + 1)
|
||||
recursive_interpolate(t_mid, t_right, mid_frame, img_right, depth + 1)
|
||||
|
||||
# Start recursive interpolation
|
||||
recursive_interpolate(0.0, 1.0, img0, img1)
|
||||
|
||||
# Fill any remaining timesteps with direct interpolation
|
||||
for t in timesteps:
|
||||
if t not in frames:
|
||||
frames[t] = interpolate_single(model, img0, img1, t)
|
||||
|
||||
# Return frames in order
|
||||
return [frames[t] for t in timesteps]
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='FILM frame interpolation worker')
|
||||
parser.add_argument('--input0', required=True, help='Path to first input image')
|
||||
parser.add_argument('--input1', required=True, help='Path to second input image')
|
||||
parser.add_argument('--output', help='Path to output image (single frame mode)')
|
||||
parser.add_argument('--output-dir', help='Output directory (batch mode)')
|
||||
parser.add_argument('--output-pattern', default='frame_{:04d}.png',
|
||||
help='Output filename pattern for batch mode')
|
||||
parser.add_argument('--timestep', type=float, default=0.5,
|
||||
help='Interpolation timestep 0-1 (single frame mode)')
|
||||
parser.add_argument('--frame-count', type=int,
|
||||
help='Number of frames to generate (batch mode)')
|
||||
parser.add_argument('--repo-dir', help='Unused (kept for backward compat)')
|
||||
parser.add_argument('--model-dir', required=True, help='Model cache directory')
|
||||
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Validate arguments
|
||||
batch_mode = args.output_dir is not None and args.frame_count is not None
|
||||
single_mode = args.output is not None
|
||||
|
||||
if not batch_mode and not single_mode:
|
||||
print("Error: Must specify either --output (single) or --output-dir + --frame-count (batch)",
|
||||
file=sys.stderr)
|
||||
return 1
|
||||
|
||||
try:
|
||||
# Select device
|
||||
if args.device == 'cuda' and torch.cuda.is_available():
|
||||
device = torch.device('cuda')
|
||||
else:
|
||||
device = torch.device('cpu')
|
||||
|
||||
# Load model
|
||||
model_dir = Path(args.model_dir)
|
||||
model = get_model(model_dir, device)
|
||||
|
||||
# Load images
|
||||
img0 = load_image(Path(args.input0), device)
|
||||
img1 = load_image(Path(args.input1), device)
|
||||
|
||||
if batch_mode:
|
||||
# Batch mode - generate all frames at once
|
||||
output_dir = Path(args.output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"Generating {args.frame_count} frames...", file=sys.stderr)
|
||||
frames = interpolate_batch(model, img0, img1, args.frame_count)
|
||||
|
||||
for i, frame in enumerate(frames):
|
||||
output_path = output_dir / args.output_pattern.format(i)
|
||||
save_image(frame, output_path)
|
||||
print(f"Saved {output_path.name}", file=sys.stderr)
|
||||
|
||||
print(f"Success: Generated {len(frames)} frames", file=sys.stderr)
|
||||
else:
|
||||
# Single frame mode
|
||||
result = interpolate_single(model, img0, img1, args.timestep)
|
||||
save_image(result, Path(args.output))
|
||||
print("Success", file=sys.stderr)
|
||||
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
import traceback
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
165
core/manager.py
165
core/manager.py
@@ -80,11 +80,12 @@ class SymlinkManager:
|
||||
|
||||
@staticmethod
|
||||
def cleanup_old_links(directory: Path) -> int:
|
||||
"""Remove existing seq* symlinks from a directory.
|
||||
"""Remove existing seq* symlinks and temporary files from a directory.
|
||||
|
||||
Handles both old format (seq_0000) and new format (seq01_0000).
|
||||
Also removes blended image files (not just symlinks) created by
|
||||
cross-dissolve transitions.
|
||||
Handles all naming formats:
|
||||
- Old folder-indexed: seq01_0000.png
|
||||
- Continuous: seq_00000.png
|
||||
Also removes blended image files and film_temp_*.png temporaries.
|
||||
|
||||
Args:
|
||||
directory: Directory to clean up.
|
||||
@@ -96,31 +97,134 @@ class SymlinkManager:
|
||||
CleanupError: If cleanup fails.
|
||||
"""
|
||||
removed = 0
|
||||
seq_pattern = re.compile(r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE)
|
||||
seq_pattern = re.compile(
|
||||
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
|
||||
)
|
||||
temp_pattern = re.compile(
|
||||
r'^film_temp_\d+\.png$', re.IGNORECASE
|
||||
)
|
||||
try:
|
||||
for item in directory.iterdir():
|
||||
# Match both old (seq_NNNN) and new (seqNN_NNNN) formats
|
||||
should_remove = False
|
||||
if item.name.startswith("seq"):
|
||||
if item.is_symlink():
|
||||
item.unlink()
|
||||
removed += 1
|
||||
should_remove = True
|
||||
elif item.is_file() and seq_pattern.match(item.name):
|
||||
# Also remove blended image files
|
||||
item.unlink()
|
||||
removed += 1
|
||||
should_remove = True
|
||||
elif item.is_file() and temp_pattern.match(item.name):
|
||||
should_remove = True
|
||||
|
||||
if should_remove:
|
||||
item.unlink()
|
||||
removed += 1
|
||||
except OSError as e:
|
||||
raise CleanupError(f"Failed to clean up old links: {e}") from e
|
||||
|
||||
return removed
|
||||
|
||||
@staticmethod
|
||||
def remove_orphan_files(directory: Path, keep_names: set[str]) -> int:
|
||||
"""Remove seq* files and film_temp_* not in the keep set.
|
||||
|
||||
Same pattern matching as cleanup_old_links but skips filenames
|
||||
present in keep_names.
|
||||
|
||||
Args:
|
||||
directory: Directory to clean orphans from.
|
||||
keep_names: Set of filenames to keep.
|
||||
|
||||
Returns:
|
||||
Number of files removed.
|
||||
|
||||
Raises:
|
||||
CleanupError: If removal fails.
|
||||
"""
|
||||
removed = 0
|
||||
seq_pattern = re.compile(
|
||||
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
|
||||
)
|
||||
temp_pattern = re.compile(
|
||||
r'^film_temp_\d+\.png$', re.IGNORECASE
|
||||
)
|
||||
try:
|
||||
for item in directory.iterdir():
|
||||
if item.name in keep_names:
|
||||
continue
|
||||
should_remove = False
|
||||
if item.name.startswith("seq"):
|
||||
if item.is_symlink():
|
||||
should_remove = True
|
||||
elif item.is_file() and seq_pattern.match(item.name):
|
||||
should_remove = True
|
||||
elif item.is_file() and temp_pattern.match(item.name):
|
||||
should_remove = True
|
||||
|
||||
if should_remove:
|
||||
item.unlink()
|
||||
removed += 1
|
||||
except OSError as e:
|
||||
raise CleanupError(f"Failed to remove orphan files: {e}") from e
|
||||
|
||||
return removed
|
||||
|
||||
@staticmethod
|
||||
def symlink_matches(link_path: Path, expected_source: Path) -> bool:
|
||||
"""Check if existing symlink resolves to expected source."""
|
||||
if not link_path.is_symlink():
|
||||
return False
|
||||
try:
|
||||
return link_path.resolve() == expected_source.resolve()
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def copy_matches(dest_path: Path, source_path: Path) -> bool:
|
||||
"""Check if existing copy matches source.
|
||||
|
||||
Fast path: size + mtime comparison. If sizes match but mtimes
|
||||
differ, falls back to comparing file contents so that a
|
||||
re-export after touching (but not changing) the source is still
|
||||
skipped, while a genuine content change is caught.
|
||||
"""
|
||||
if not dest_path.is_file() or dest_path.is_symlink():
|
||||
return False
|
||||
try:
|
||||
src_stat = source_path.stat()
|
||||
dst_stat = dest_path.stat()
|
||||
if src_stat.st_size != dst_stat.st_size:
|
||||
return False
|
||||
# Fast path: identical mtime means the copy2 wrote this file
|
||||
if abs(src_stat.st_mtime - dst_stat.st_mtime) < 2.0:
|
||||
return True
|
||||
# Size matches but mtime differs — compare contents
|
||||
return SymlinkManager._files_equal(source_path, dest_path)
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _files_equal(a: Path, b: Path, chunk_size: int = 65536) -> bool:
|
||||
"""Compare two files by reading in chunks."""
|
||||
try:
|
||||
with open(a, 'rb') as fa, open(b, 'rb') as fb:
|
||||
while True:
|
||||
ca = fa.read(chunk_size)
|
||||
cb = fb.read(chunk_size)
|
||||
if ca != cb:
|
||||
return False
|
||||
if not ca:
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def create_sequence_links(
|
||||
self,
|
||||
sources: list[Path],
|
||||
dest: Path,
|
||||
files: list[tuple],
|
||||
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
|
||||
copy_files: bool = False,
|
||||
) -> tuple[list[LinkResult], Optional[int]]:
|
||||
"""Create sequenced symlinks from source files to destination.
|
||||
"""Create sequenced symlinks or copies from source files to destination.
|
||||
|
||||
Args:
|
||||
sources: List of source directories (for validation).
|
||||
@@ -129,12 +233,12 @@ class SymlinkManager:
|
||||
- (source_dir, filename) for CLI mode (uses global sequence)
|
||||
- (source_dir, filename, folder_idx, file_idx) for GUI mode
|
||||
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
|
||||
copy_files: If True, copy files instead of creating symlinks.
|
||||
|
||||
Returns:
|
||||
Tuple of (list of LinkResult objects, session_id or None).
|
||||
"""
|
||||
self.validate_paths(sources, dest)
|
||||
self.cleanup_old_links(dest)
|
||||
|
||||
session_id = None
|
||||
if self.db:
|
||||
@@ -165,6 +269,13 @@ class SymlinkManager:
|
||||
expanded_files.append((source_dir, filename, folder_idx, file_idx))
|
||||
files = expanded_files
|
||||
|
||||
# Build planned names for orphan removal
|
||||
planned_names: set[str] = set()
|
||||
for file_data in files:
|
||||
_, fn, fi, fli = file_data
|
||||
ext = Path(fn).suffix
|
||||
planned_names.add(f"seq{fi + 1:02d}_{fli:04d}{ext}")
|
||||
|
||||
for i, file_data in enumerate(files):
|
||||
source_dir, filename, folder_idx, file_idx = file_data
|
||||
source_path = source_dir / filename
|
||||
@@ -172,11 +283,25 @@ class SymlinkManager:
|
||||
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
|
||||
link_path = dest / link_name
|
||||
|
||||
# Calculate relative path from destination to source
|
||||
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
|
||||
|
||||
try:
|
||||
link_path.symlink_to(rel_source)
|
||||
# Check if existing file already matches
|
||||
already_correct = False
|
||||
if link_path.exists() or link_path.is_symlink():
|
||||
if copy_files:
|
||||
already_correct = self.copy_matches(link_path, source_path)
|
||||
else:
|
||||
already_correct = self.symlink_matches(link_path, source_path)
|
||||
|
||||
if not already_correct:
|
||||
if link_path.exists() or link_path.is_symlink():
|
||||
link_path.unlink()
|
||||
|
||||
if copy_files:
|
||||
import shutil
|
||||
shutil.copy2(source_path, link_path)
|
||||
else:
|
||||
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
|
||||
link_path.symlink_to(rel_source)
|
||||
|
||||
if self.db and session_id:
|
||||
self.db.record_symlink(
|
||||
@@ -202,4 +327,10 @@ class SymlinkManager:
|
||||
error=str(e)
|
||||
))
|
||||
|
||||
# Remove orphan seq*/film_temp_* files not in the planned set
|
||||
try:
|
||||
self.remove_orphan_files(dest, planned_names)
|
||||
except CleanupError:
|
||||
pass
|
||||
|
||||
return results, session_id
|
||||
|
||||
@@ -32,6 +32,12 @@ class FolderType(Enum):
|
||||
TRANSITION = 'transition'
|
||||
|
||||
|
||||
class DirectInterpolationMethod(Enum):
|
||||
"""Method for direct frame interpolation between sequences."""
|
||||
RIFE = 'rife'
|
||||
FILM = 'film'
|
||||
|
||||
|
||||
# --- Data Classes ---
|
||||
|
||||
@dataclass
|
||||
@@ -51,14 +57,54 @@ class TransitionSettings:
|
||||
# Practical-RIFE settings
|
||||
practical_rife_model: str = 'v4.25' # v4.25, v4.26, v4.22, etc.
|
||||
practical_rife_ensemble: bool = False # Ensemble mode for better quality (slower)
|
||||
# Optical flow settings
|
||||
of_preset: str = 'balanced' # fast, balanced, quality, max
|
||||
of_levels: int = 3 # pyramid levels (1-7)
|
||||
of_winsize: int = 15 # window size (5-51, odd)
|
||||
of_iterations: int = 3 # iterations (1-10)
|
||||
of_poly_n: int = 5 # polynomial neighborhood (5 or 7)
|
||||
of_poly_sigma: float = 1.2 # gaussian sigma (0.5-2.0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PerTransitionSettings:
|
||||
"""Per-transition overlap settings for asymmetric cross-dissolves."""
|
||||
"""Per-transition overlap settings for cross-dissolves."""
|
||||
trans_folder: Path
|
||||
left_overlap: int = 16 # frames from main folder end
|
||||
right_overlap: int = 16 # frames from trans folder start
|
||||
left_overlap: int = 16 # overlap count at left boundary (MAIN→TRANS)
|
||||
right_overlap: int = 16 # overlap count at right boundary (TRANS→MAIN)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DirectTransitionSettings:
|
||||
"""Settings for direct AI interpolation between sequences (no transition folder)."""
|
||||
after_folder: Path # The folder after which this transition occurs
|
||||
frame_count: int = 16 # Number of interpolated frames to generate
|
||||
method: DirectInterpolationMethod = DirectInterpolationMethod.FILM
|
||||
enabled: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoPreset:
|
||||
"""Preset for video encoding via ffmpeg."""
|
||||
label: str # Display name
|
||||
container: str # 'mp4' or 'webm'
|
||||
codec: str # ffmpeg codec: libx264, libx265, libvpx-vp9, libaom-av1
|
||||
crf: int
|
||||
pixel_format: str = 'yuv420p'
|
||||
preset: str = 'medium' # x264/x265 speed preset
|
||||
max_height: Optional[int] = None # Downscale filter
|
||||
extra_args: list[str] = field(default_factory=list)
|
||||
|
||||
VIDEO_PRESETS: dict[str, VideoPreset] = {
|
||||
'web_streaming': VideoPreset('Web Streaming', 'mp4', 'libx264', 23, preset='medium'),
|
||||
'high_quality': VideoPreset('High Quality', 'mp4', 'libx264', 18, preset='slow'),
|
||||
'archive': VideoPreset('Archive (H.265)', 'mp4', 'libx265', 18, preset='slow', extra_args=['-tag:v', 'hvc1']),
|
||||
'social_media': VideoPreset('Social Media', 'mp4', 'libx264', 23, preset='fast', max_height=1080),
|
||||
'fast_preview': VideoPreset('Fast Preview', 'mp4', 'libx264', 28, preset='ultrafast'),
|
||||
'webm_vp9': VideoPreset('WebM VP9', 'webm', 'libvpx-vp9', 30, extra_args=['-b:v', '0']),
|
||||
'webm_av1': VideoPreset('WebM AV1', 'webm', 'libaom-av1', 30, extra_args=['-b:v', '0', '-strict', 'experimental']),
|
||||
'godot_theora': VideoPreset('Godot (Theora)', 'ogv', 'libtheora', 8, extra_args=['-g', '512']),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -84,6 +130,9 @@ class TransitionSpec:
|
||||
# Indices into the overall file list
|
||||
main_start_idx: int
|
||||
trans_start_idx: int
|
||||
# Position indices in the folders list (for duplicate folder support)
|
||||
main_folder_idx: int = 0
|
||||
trans_folder_idx: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -115,6 +164,8 @@ class SessionRecord:
|
||||
created_at: datetime
|
||||
destination: str
|
||||
link_count: int = 0
|
||||
name: Optional[str] = None
|
||||
locked: bool = False
|
||||
|
||||
|
||||
# --- Exceptions ---
|
||||
|
||||
259
core/video.py
Normal file
259
core/video.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Video encoding utilities wrapping ffmpeg."""
|
||||
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Callable, Optional
|
||||
|
||||
from .models import VideoPreset
|
||||
|
||||
|
||||
def find_ffmpeg() -> Optional[Path]:
|
||||
"""Find the ffmpeg binary on the system PATH."""
|
||||
result = shutil.which('ffmpeg')
|
||||
return Path(result) if result else None
|
||||
|
||||
|
||||
def encode_image_sequence(
|
||||
input_dir: Path,
|
||||
output_path: Path,
|
||||
fps: int,
|
||||
preset: VideoPreset,
|
||||
input_pattern: Optional[str] = None,
|
||||
progress_callback: Optional[Callable[[int, int], bool]] = None,
|
||||
total_frames: Optional[int] = None,
|
||||
) -> tuple[bool, str]:
|
||||
"""Encode an image sequence directory to a video file using ffmpeg.
|
||||
|
||||
Args:
|
||||
input_dir: Directory containing sequentially named image files.
|
||||
output_path: Output video file path.
|
||||
fps: Frames per second.
|
||||
preset: VideoPreset with codec settings.
|
||||
input_pattern: ffmpeg input pattern (e.g. 'seq_%06d.png').
|
||||
Auto-detected from first seq_* file if not provided.
|
||||
progress_callback: Called with (current_frame, total_frames).
|
||||
Return False to cancel encoding.
|
||||
total_frames: Total number of frames for progress reporting.
|
||||
Auto-counted from input_dir if not provided.
|
||||
|
||||
Returns:
|
||||
(success, message) — message is output_path on success or error text on failure.
|
||||
"""
|
||||
ffmpeg = find_ffmpeg()
|
||||
if not ffmpeg:
|
||||
return False, "ffmpeg not found. Install ffmpeg to encode video."
|
||||
|
||||
# Auto-detect input pattern from first seq_* file
|
||||
if input_pattern is None:
|
||||
input_pattern = _detect_input_pattern(input_dir)
|
||||
if input_pattern is None:
|
||||
return False, f"No seq_* image files found in {input_dir}"
|
||||
|
||||
# Auto-count frames
|
||||
if total_frames is None:
|
||||
ext = Path(input_pattern).suffix
|
||||
total_frames = len(list(input_dir.glob(f"seq_*{ext}")))
|
||||
if total_frames == 0:
|
||||
return False, f"No matching frames found in {input_dir}"
|
||||
|
||||
# Build ffmpeg command
|
||||
cmd = [
|
||||
str(ffmpeg), '-y',
|
||||
'-framerate', str(fps),
|
||||
'-i', str(input_dir / input_pattern),
|
||||
'-c:v', preset.codec,
|
||||
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
|
||||
'-pix_fmt', preset.pixel_format,
|
||||
]
|
||||
|
||||
# Add speed preset for x264/x265
|
||||
if preset.codec in ('libx264', 'libx265'):
|
||||
cmd += ['-preset', preset.preset]
|
||||
|
||||
# Add downscale filter if max_height is set
|
||||
if preset.max_height is not None:
|
||||
cmd += ['-vf', f'scale=-2:{preset.max_height}']
|
||||
|
||||
# Add any extra codec-specific args
|
||||
if preset.extra_args:
|
||||
cmd += preset.extra_args
|
||||
|
||||
# Progress parsing via -progress pipe:1
|
||||
cmd += ['-progress', 'pipe:1']
|
||||
|
||||
cmd.append(str(output_path))
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
|
||||
cancelled = False
|
||||
if proc.stdout:
|
||||
for line in proc.stdout:
|
||||
line = line.strip()
|
||||
m = re.match(r'^frame=(\d+)', line)
|
||||
if m and progress_callback is not None:
|
||||
current = int(m.group(1))
|
||||
if not progress_callback(current, total_frames):
|
||||
cancelled = True
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
break
|
||||
|
||||
proc.wait()
|
||||
|
||||
if cancelled:
|
||||
# Clean up partial file
|
||||
if output_path.exists():
|
||||
output_path.unlink()
|
||||
return False, "Encoding cancelled by user."
|
||||
|
||||
if proc.returncode != 0:
|
||||
stderr = proc.stderr.read() if proc.stderr else ""
|
||||
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
|
||||
|
||||
return True, str(output_path)
|
||||
|
||||
except FileNotFoundError:
|
||||
return False, "ffmpeg binary not found."
|
||||
except Exception as e:
|
||||
return False, f"Encoding error: {e}"
|
||||
|
||||
|
||||
def _detect_input_pattern(input_dir: Path) -> Optional[str]:
|
||||
"""Detect the ffmpeg input pattern from seq_* files in a directory.
|
||||
|
||||
Looks for files like seq_000000.png and returns a pattern like seq_%06d.png.
|
||||
"""
|
||||
for f in sorted(input_dir.iterdir()):
|
||||
m = re.match(r'^(seq_)(\d+)(\.\w+)$', f.name)
|
||||
if m:
|
||||
prefix = m.group(1)
|
||||
digits = m.group(2)
|
||||
ext = m.group(3)
|
||||
width = len(digits)
|
||||
return f"{prefix}%0{width}d{ext}"
|
||||
return None
|
||||
|
||||
|
||||
def encode_from_file_list(
|
||||
file_paths: list[Path],
|
||||
output_path: Path,
|
||||
fps: int,
|
||||
preset: VideoPreset,
|
||||
progress_callback: Optional[Callable[[int, int], bool]] = None,
|
||||
) -> tuple[bool, str]:
|
||||
"""Encode a video from an explicit list of image file paths.
|
||||
|
||||
Uses ffmpeg's concat demuxer so files can be scattered across directories.
|
||||
|
||||
Args:
|
||||
file_paths: Ordered list of image file paths.
|
||||
output_path: Output video file path.
|
||||
fps: Frames per second.
|
||||
preset: VideoPreset with codec settings.
|
||||
progress_callback: Called with (current_frame, total_frames).
|
||||
Return False to cancel encoding.
|
||||
|
||||
Returns:
|
||||
(success, message) — message is output_path on success or error text on failure.
|
||||
"""
|
||||
ffmpeg = find_ffmpeg()
|
||||
if not ffmpeg:
|
||||
return False, "ffmpeg not found. Install ffmpeg to encode video."
|
||||
|
||||
if not file_paths:
|
||||
return False, "No files provided."
|
||||
|
||||
total_frames = len(file_paths)
|
||||
frame_duration = f"{1.0 / fps:.10f}"
|
||||
|
||||
# Write a concat-demuxer file listing each image with its duration
|
||||
try:
|
||||
concat_file = tempfile.NamedTemporaryFile(
|
||||
mode='w', suffix='.txt', delete=False, prefix='vml_concat_'
|
||||
)
|
||||
concat_path = Path(concat_file.name)
|
||||
for p in file_paths:
|
||||
# Escape single quotes for ffmpeg concat format
|
||||
escaped = str(p.resolve()).replace("'", "'\\''")
|
||||
concat_file.write(f"file '{escaped}'\n")
|
||||
concat_file.write(f"duration {frame_duration}\n")
|
||||
# Repeat last file so the last frame displays for its full duration
|
||||
escaped = str(file_paths[-1].resolve()).replace("'", "'\\''")
|
||||
concat_file.write(f"file '{escaped}'\n")
|
||||
concat_file.close()
|
||||
except OSError as e:
|
||||
return False, f"Failed to create concat file: {e}"
|
||||
|
||||
cmd = [
|
||||
str(ffmpeg), '-y',
|
||||
'-f', 'concat', '-safe', '0',
|
||||
'-i', str(concat_path),
|
||||
'-c:v', preset.codec,
|
||||
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
|
||||
'-pix_fmt', preset.pixel_format,
|
||||
]
|
||||
|
||||
if preset.codec in ('libx264', 'libx265'):
|
||||
cmd += ['-preset', preset.preset]
|
||||
|
||||
if preset.max_height is not None:
|
||||
cmd += ['-vf', f'scale=-2:{preset.max_height}']
|
||||
|
||||
if preset.extra_args:
|
||||
cmd += preset.extra_args
|
||||
|
||||
cmd += ['-progress', 'pipe:1']
|
||||
cmd.append(str(output_path))
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
|
||||
cancelled = False
|
||||
if proc.stdout:
|
||||
for line in proc.stdout:
|
||||
line = line.strip()
|
||||
m = re.match(r'^frame=(\d+)', line)
|
||||
if m and progress_callback is not None:
|
||||
current = int(m.group(1))
|
||||
if not progress_callback(current, total_frames):
|
||||
cancelled = True
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
break
|
||||
|
||||
proc.wait()
|
||||
|
||||
if cancelled:
|
||||
if output_path.exists():
|
||||
output_path.unlink()
|
||||
return False, "Encoding cancelled by user."
|
||||
|
||||
if proc.returncode != 0:
|
||||
stderr = proc.stderr.read() if proc.stderr else ""
|
||||
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
|
||||
|
||||
return True, str(output_path)
|
||||
|
||||
except FileNotFoundError:
|
||||
return False, "ffmpeg binary not found."
|
||||
except Exception as e:
|
||||
return False, f"Encoding error: {e}"
|
||||
finally:
|
||||
try:
|
||||
concat_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
4208
ui/main_window.py
4208
ui/main_window.py
File diff suppressed because it is too large
Load Diff
@@ -15,6 +15,7 @@ class TrimSlider(QWidget):
|
||||
"""
|
||||
|
||||
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
|
||||
trimDragFinished = pyqtSignal(int, int, str) # Emits final values on mouse release
|
||||
|
||||
def __init__(self, parent: Optional[QWidget] = None) -> None:
|
||||
"""Initialize the trim slider.
|
||||
@@ -287,5 +288,11 @@ class TrimSlider(QWidget):
|
||||
|
||||
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
|
||||
"""Handle mouse release to stop dragging."""
|
||||
self._dragging = None
|
||||
self.setCursor(Qt.CursorShape.ArrowCursor)
|
||||
if self._dragging:
|
||||
handle = self._dragging
|
||||
self._dragging = None
|
||||
self.setCursor(Qt.CursorShape.ArrowCursor)
|
||||
self.trimDragFinished.emit(self._trim_start, self._trim_end, handle)
|
||||
else:
|
||||
self._dragging = None
|
||||
self.setCursor(Qt.CursorShape.ArrowCursor)
|
||||
|
||||
Reference in New Issue
Block a user