Compare commits

..

1 Commits

Author SHA1 Message Date
3dd46a0aa1 cool 2026-02-03 16:21:56 +01:00
16 changed files with 2023 additions and 11049 deletions

60
.gitignore vendored
View File

@@ -1,64 +1,4 @@
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so
# Virtual environments
venv/
venv-rife/
.venv/
env/
# Environment files
.env
.env.local
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Database
*.db
*.sqlite
*.sqlite3
# Downloads and cache
*.pkl
*.pt
*.pth
*.onnx
downloads/
cache/
.cache/
# RIFE binaries and models
rife-ncnn-vulkan*/
*.zip
# Output directories
output/
outputs/
temp/
tmp/
# Logs
*.log
logs/
# OS files
.DS_Store
Thumbs.db
# Build artifacts
dist/
build/
*.egg-info/
# Git mirror scripts
gitea-push-mirror-setup

152
README.md
View File

@@ -1,157 +1,97 @@
# Video Montage Linker
A PyQt6 application for creating sequenced symlinks from image folders with advanced cross-dissolve transitions. Perfect for preparing image sequences for video editing, time-lapse assembly, or montage creation.
A PyQt6 application to create sequenced symlinks for image files. Useful for preparing image sequences for video editing or montage creation.
## Features
### Source Folder Management
### Multiple Source Folders
- Add multiple source folders to merge images from different locations
- Drag & drop folders directly onto the window
- Alternating folder types: odd positions = **Main**, even positions = **Transition**
- Override folder types via right-click context menu
- Reorder folders with up/down buttons
- Per-folder trim settings (exclude frames from start/end)
- Files are ordered by folder (first added = first in sequence), then alphabetically within each folder
- Drag & drop folders directly onto the window to add them
- Multi-select support for removing folders
### Cross-Dissolve Transitions
Smooth blending between folder boundaries with four blend methods:
### File Management
- Two-column view showing filename and source path
- Drag & drop to reorder files in the sequence
- Multi-select files (Ctrl+click, Shift+click)
- Remove files with Delete key or "Remove Files" button
- Refresh to rescan source folders
| Method | Description | Quality | Speed |
|--------|-------------|---------|-------|
| **Cross-Dissolve** | Simple alpha blend | Good | Fastest |
| **Optical Flow** | Motion-compensated blend using OpenCV Farneback | Better | Medium |
| **RIFE (ncnn)** | Neural network interpolation via rife-ncnn-vulkan | Best | Fast (GPU) |
| **RIFE (Practical)** | PyTorch-based Practical-RIFE (v4.25/v4.26) | Best | Medium (GPU) |
### Symlink Creation
- Creates numbered symlinks (`seq_0000.png`, `seq_0001.png`, etc.)
- Uses relative paths for portability
- Automatically cleans up old `seq_*` links before creating new ones
- **Asymmetric overlap**: Set different frame counts for each side of a transition
- **Blend curves**: Linear, Ease In, Ease Out, Ease In/Out
- **Output formats**: PNG, JPEG (with quality), WebP (lossless with method setting)
- **RIFE auto-download**: Automatically downloads rife-ncnn-vulkan binary
- **Practical-RIFE models**: Auto-downloads from Google Drive on first use
### Session Tracking
- SQLite database tracks all symlink sessions
- Located at `~/.config/video-montage-linker/symlinks.db`
- List past sessions and clean up by destination
### Preview
- **Video Preview**: Play video files from source folders
- **Image Sequence Preview**: Browse frames with zoom (scroll wheel) and pan (drag)
- **Sequence Table**: 2-column view showing Main/Transition frame pairing
- **Trim Slider**: Visual frame range selection per folder
### Dual Export Destinations
- **Sequence destination**: Regular symlinks only
- **Transition destination**: Symlinks + blended transition frames
### Session Persistence
- SQLite database tracks all sessions and settings
- Resume previous session by selecting the same destination folder
- Restores: source folders, trim settings, folder types, transition settings, per-transition overlaps
### Supported Formats
- PNG, WEBP, JPG, JPEG
## Installation
### Requirements
- Python 3.10+
- PyQt6
- Pillow
- NumPy
- OpenCV (optional, for Optical Flow)
Requires Python 3 and PyQt6:
```bash
pip install PyQt6 Pillow numpy opencv-python
pip install PyQt6
```
**Note:** Practical-RIFE creates its own isolated venv with PyTorch. The `gdown` package is installed automatically for downloading models from Google Drive.
### RIFE ncnn (Optional)
For AI-powered frame interpolation using Vulkan GPU acceleration:
- Select **RIFE (ncnn)** as the blend method
- Click **Download** to auto-fetch [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan)
- Or specify a custom binary path
- Models: rife-v4.6, rife-v4.15-lite, etc.
### Practical-RIFE (Optional)
For PyTorch-based frame interpolation with latest models:
- Select **RIFE (Practical)** as the blend method
- Click **Setup PyTorch** to create an isolated venv with PyTorch (~2GB)
- Models auto-download from Google Drive on first use
- Available models: v4.26, v4.25, v4.22, v4.20, v4.18, v4.15
- Optional ensemble mode for higher quality (slower)
The venv is stored at `~/.cache/video-montage-linker/venv-rife/`
## Usage
### GUI Mode
```bash
python symlink.py # Launch GUI (default)
python symlink.py --gui # Explicit GUI launch
# Launch the graphical interface
python symlink.py
python symlink.py --gui
```
**Workflow:**
1. Add source folders (drag & drop or click "Add Folder")
2. Adjust trim settings per folder if needed (right-click or use trim slider)
3. Set destination folder(s)
4. Enable transitions and configure blend method/settings
5. Click **Export Sequence** or **Export with Transitions**
1. Click "Add Folder" or drag & drop folders to add source directories
2. Reorder files by dragging them in the list
3. Remove unwanted files (select + Delete key)
4. Select destination folder
5. Click "Generate Virtual Sequence"
### CLI Mode
```bash
# Create symlinks from source folders
python symlink.py --src /path/to/folder1 --src /path/to/folder2 --dst /path/to/dest
# Create symlinks from a single source
python symlink.py --src /path/to/images --dst /path/to/dest
# Merge multiple source folders
python symlink.py --src /folder1 --src /folder2 --dst /path/to/dest
# List all tracked sessions
python symlink.py --list
# Clean up symlinks and remove session
# Clean up symlinks and remove session record
python symlink.py --clean /path/to/dest
```
## File Structure
```
video-montage-linker/
├── symlink.py # Entry point, CLI
├── config.py # Constants, paths
├── core/
│ ├── models.py # Enums, dataclasses
│ ├── database.py # SQLite session management
│ ├── blender.py # Image blending, RIFE downloader, Practical-RIFE env
│ ├── rife_worker.py # Practical-RIFE inference (runs in isolated venv)
│ └── manager.py # Symlink operations
└── ui/
├── widgets.py # TrimSlider, custom widgets
└── main_window.py # Main application window
```
## Supported Formats
**Images:** PNG, WEBP, JPG, JPEG, TIFF, BMP, EXR
**Videos (preview only):** MP4, MOV, AVI, MKV, WEBM
## Database
Session data stored at: `~/.config/video-montage-linker/symlinks.db`
## System Installation (Linux)
```bash
# Make executable
chmod +x symlink.py
To add as a system application:
# Add to PATH
ln -s /full/path/to/symlink.py ~/.local/bin/video-montage-linker
```bash
# Make executable and add to PATH
chmod +x symlink.py
ln -s /path/to/symlink.py ~/.local/bin/video-montage-linker
# Create desktop entry
cat > ~/.local/share/applications/video-montage-linker.desktop << 'EOF'
[Desktop Entry]
Name=Video Montage Linker
Comment=Create sequenced symlinks with cross-dissolve transitions
Exec=/full/path/to/symlink.py
Comment=Create sequenced symlinks for image files
Exec=/path/to/symlink.py
Icon=emblem-symbolic-link
Terminal=false
Type=Application
Categories=Utility;Graphics;AudioVideo;
Categories=Utility;Graphics;
EOF
# Update desktop database
update-desktop-database ~/.local/share/applications/
```

View File

@@ -1,10 +0,0 @@
"""Configuration constants for Video Montage Linker."""
from pathlib import Path
# Supported file extensions
SUPPORTED_EXTENSIONS = ('.png', '.webp', '.jpg', '.jpeg')
VIDEO_EXTENSIONS = ('.mp4', '.webm', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.m4v')
# Database path
DB_PATH = Path.home() / '.config' / 'video-montage-linker' / 'symlinks.db'

View File

@@ -1,62 +0,0 @@
"""Core modules for Video Montage Linker."""
from .models import (
BlendCurve,
BlendMethod,
FolderType,
DirectInterpolationMethod,
TransitionSettings,
PerTransitionSettings,
DirectTransitionSettings,
VideoPreset,
VIDEO_PRESETS,
BlendResult,
TransitionSpec,
LinkResult,
SymlinkRecord,
SessionRecord,
SymlinkError,
PathValidationError,
SourceNotFoundError,
DestinationError,
CleanupError,
DatabaseError,
)
from .database import DatabaseManager
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv, FilmEnv, OPTICAL_FLOW_PRESETS
from .manager import SymlinkManager
from .video import encode_image_sequence, encode_from_file_list, find_ffmpeg
__all__ = [
'BlendCurve',
'BlendMethod',
'FolderType',
'DirectInterpolationMethod',
'TransitionSettings',
'PerTransitionSettings',
'DirectTransitionSettings',
'VideoPreset',
'VIDEO_PRESETS',
'BlendResult',
'TransitionSpec',
'LinkResult',
'SymlinkRecord',
'SessionRecord',
'SymlinkError',
'PathValidationError',
'SourceNotFoundError',
'DestinationError',
'CleanupError',
'DatabaseError',
'DatabaseManager',
'ImageBlender',
'TransitionGenerator',
'RifeDownloader',
'PracticalRifeEnv',
'FilmEnv',
'SymlinkManager',
'OPTICAL_FLOW_PRESETS',
'encode_image_sequence',
'encode_from_file_list',
'find_ffmpeg',
]

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,285 +0,0 @@
#!/usr/bin/env python
"""FILM interpolation worker - runs in isolated venv with PyTorch.
This script is executed via subprocess from the main application.
It handles frame interpolation using Google Research's FILM model
(Frame Interpolation for Large Motion) via the frame-interpolation-pytorch repo.
FILM is better than RIFE for large motion and scene gaps, but slower.
Supports two modes:
1. Single frame: --output with --timestep
2. Batch mode: --output-dir with --frame-count (generates all frames at once)
"""
import argparse
import sys
import urllib.request
from pathlib import Path
import numpy as np
import torch
from PIL import Image
# Model download URL
FILM_MODEL_URL = "https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt"
FILM_MODEL_FILENAME = "film_net_fp32.pt"
def load_image(path: Path, device: torch.device) -> torch.Tensor:
"""Load image as tensor.
Args:
path: Path to image file.
device: Device to load tensor to.
Returns:
Image tensor (1, 3, H, W) normalized to [0, 1].
"""
img = Image.open(path).convert('RGB')
arr = np.array(img).astype(np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
return tensor.to(device)
def save_image(tensor: torch.Tensor, path: Path) -> None:
"""Save tensor as image.
Args:
tensor: Image tensor (1, 3, H, W) or (3, H, W) normalized to [0, 1].
path: Output path.
"""
if tensor.dim() == 4:
tensor = tensor.squeeze(0)
arr = tensor.permute(1, 2, 0).cpu().numpy()
arr = (arr * 255).clip(0, 255).astype(np.uint8)
Image.fromarray(arr).save(path)
# Global model cache
_model_cache: dict = {}
def download_model(model_dir: Path) -> Path:
"""Download FILM model if not present.
Args:
model_dir: Directory to store the model.
Returns:
Path to the downloaded model file.
"""
model_dir.mkdir(parents=True, exist_ok=True)
model_path = model_dir / FILM_MODEL_FILENAME
if not model_path.exists():
print(f"Downloading FILM model to {model_path}...", file=sys.stderr)
urllib.request.urlretrieve(FILM_MODEL_URL, model_path)
print("Download complete.", file=sys.stderr)
return model_path
def get_model(model_dir: Path, device: torch.device):
"""Get or load FILM model (cached).
Args:
model_dir: Model cache directory (for model downloads).
device: Device to run on.
Returns:
FILM TorchScript model instance.
"""
cache_key = f"film_{device}"
if cache_key not in _model_cache:
# Download model if needed
model_path = download_model(model_dir)
# Load pre-trained TorchScript model
print(f"Loading FILM model from {model_path}...", file=sys.stderr)
model = torch.jit.load(str(model_path), map_location='cpu')
model.eval()
model.to(device)
_model_cache[cache_key] = model
print("Model loaded.", file=sys.stderr)
return _model_cache[cache_key]
@torch.no_grad()
def interpolate_single(model, img0: torch.Tensor, img1: torch.Tensor, t: float) -> torch.Tensor:
"""Perform single frame interpolation using FILM.
Args:
model: FILM TorchScript model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
t: Interpolation timestep (0.0 to 1.0).
Returns:
Interpolated frame tensor.
"""
# FILM TorchScript model expects dt as tensor of shape (1, 1)
dt = img0.new_full((1, 1), t)
result = model(img0, img1, dt)
if isinstance(result, tuple):
result = result[0]
return result.clamp(0, 1)
@torch.no_grad()
def interpolate_batch(model, img0: torch.Tensor, img1: torch.Tensor, frame_count: int) -> list[torch.Tensor]:
"""Generate multiple interpolated frames using FILM's recursive approach.
FILM works best when generating frames recursively - it first generates
the middle frame, then fills in the gaps. This produces more consistent
results than generating arbitrary timesteps independently.
Args:
model: FILM model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
frame_count: Number of frames to generate between img0 and img1.
Returns:
List of interpolated frame tensors in order.
"""
# Calculate timesteps for evenly spaced frames
timesteps = [(i + 1) / (frame_count + 1) for i in range(frame_count)]
# Try to use the model's batch/recursive interpolation if available
try:
# Some implementations have an interpolate_recursively method
if hasattr(model, 'interpolate_recursively'):
# This generates 2^n - 1 frames, so we need to handle arbitrary counts
results = model.interpolate_recursively(img0, img1, frame_count)
if len(results) >= frame_count:
return results[:frame_count]
except (AttributeError, TypeError):
pass
# Fall back to recursive binary interpolation for better quality
# This mimics FILM's natural recursive approach
frames = {} # timestep -> tensor
def recursive_interpolate(t_left: float, t_right: float, img_left: torch.Tensor, img_right: torch.Tensor, depth: int = 0):
"""Recursively interpolate to fill the gap."""
if depth > 10: # Prevent infinite recursion
return
t_mid = (t_left + t_right) / 2
# Check if we need a frame near t_mid
need_frame = False
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
need_frame = True
break
if not need_frame:
# Check if any remaining timesteps are in this range
remaining = [t for t in timesteps if t not in frames and t_left < t < t_right]
if not remaining:
return
# Generate middle frame
mid_frame = interpolate_single(model, img_left, img_right, 0.5)
# Assign to nearest needed timestep
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
frames[t] = mid_frame
break
# Recurse into left and right halves
recursive_interpolate(t_left, t_mid, img_left, mid_frame, depth + 1)
recursive_interpolate(t_mid, t_right, mid_frame, img_right, depth + 1)
# Start recursive interpolation
recursive_interpolate(0.0, 1.0, img0, img1)
# Fill any remaining timesteps with direct interpolation
for t in timesteps:
if t not in frames:
frames[t] = interpolate_single(model, img0, img1, t)
# Return frames in order
return [frames[t] for t in timesteps]
def main():
parser = argparse.ArgumentParser(description='FILM frame interpolation worker')
parser.add_argument('--input0', required=True, help='Path to first input image')
parser.add_argument('--input1', required=True, help='Path to second input image')
parser.add_argument('--output', help='Path to output image (single frame mode)')
parser.add_argument('--output-dir', help='Output directory (batch mode)')
parser.add_argument('--output-pattern', default='frame_{:04d}.png',
help='Output filename pattern for batch mode')
parser.add_argument('--timestep', type=float, default=0.5,
help='Interpolation timestep 0-1 (single frame mode)')
parser.add_argument('--frame-count', type=int,
help='Number of frames to generate (batch mode)')
parser.add_argument('--repo-dir', help='Unused (kept for backward compat)')
parser.add_argument('--model-dir', required=True, help='Model cache directory')
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
args = parser.parse_args()
# Validate arguments
batch_mode = args.output_dir is not None and args.frame_count is not None
single_mode = args.output is not None
if not batch_mode and not single_mode:
print("Error: Must specify either --output (single) or --output-dir + --frame-count (batch)",
file=sys.stderr)
return 1
try:
# Select device
if args.device == 'cuda' and torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load model
model_dir = Path(args.model_dir)
model = get_model(model_dir, device)
# Load images
img0 = load_image(Path(args.input0), device)
img1 = load_image(Path(args.input1), device)
if batch_mode:
# Batch mode - generate all frames at once
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Generating {args.frame_count} frames...", file=sys.stderr)
frames = interpolate_batch(model, img0, img1, args.frame_count)
for i, frame in enumerate(frames):
output_path = output_dir / args.output_pattern.format(i)
save_image(frame, output_path)
print(f"Saved {output_path.name}", file=sys.stderr)
print(f"Success: Generated {len(frames)} frames", file=sys.stderr)
else:
# Single frame mode
result = interpolate_single(model, img0, img1, args.timestep)
save_image(result, Path(args.output))
print("Success", file=sys.stderr)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,336 +0,0 @@
"""Symlink management for Video Montage Linker."""
import os
import re
from pathlib import Path
from typing import Optional
from config import SUPPORTED_EXTENSIONS
from .models import LinkResult, CleanupError, SourceNotFoundError, DestinationError
from .database import DatabaseManager
class SymlinkManager:
"""Manages symlink creation and cleanup operations."""
def __init__(self, db: Optional[DatabaseManager] = None) -> None:
"""Initialize the symlink manager.
Args:
db: Optional database manager for tracking operations.
"""
self.db = db
@staticmethod
def get_supported_files(directories: list[Path]) -> list[tuple[Path, str]]:
"""Get all supported image files from multiple directories.
Files are returned sorted by directory order (as provided), then
alphabetically by filename within each directory.
Args:
directories: List of source directories to scan.
Returns:
List of (directory, filename) tuples.
"""
files: list[tuple[Path, str]] = []
for directory in directories:
if not directory.is_dir():
continue
dir_files = []
for item in directory.iterdir():
if item.is_file() and item.suffix.lower() in SUPPORTED_EXTENSIONS:
dir_files.append((directory, item.name))
# Sort files within this directory alphabetically
dir_files.sort(key=lambda x: x[1].lower())
files.extend(dir_files)
return files
@staticmethod
def validate_paths(sources: list[Path], dest: Path) -> None:
"""Validate source and destination paths.
Args:
sources: List of source directories.
dest: Destination directory.
Raises:
SourceNotFoundError: If any source directory doesn't exist.
DestinationError: If destination cannot be created or accessed.
"""
if not sources:
raise SourceNotFoundError("No source directories specified")
for source in sources:
if not source.exists():
raise SourceNotFoundError(f"Source directory not found: {source}")
if not source.is_dir():
raise SourceNotFoundError(f"Source is not a directory: {source}")
try:
dest.mkdir(parents=True, exist_ok=True)
except OSError as e:
raise DestinationError(f"Cannot create destination directory: {e}") from e
if not dest.is_dir():
raise DestinationError(f"Destination is not a directory: {dest}")
@staticmethod
def cleanup_old_links(directory: Path) -> int:
"""Remove existing seq* symlinks and temporary files from a directory.
Handles all naming formats:
- Old folder-indexed: seq01_0000.png
- Continuous: seq_00000.png
Also removes blended image files and film_temp_*.png temporaries.
Args:
directory: Directory to clean up.
Returns:
Number of files removed.
Raises:
CleanupError: If cleanup fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
try:
for item in directory.iterdir():
should_remove = False
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to clean up old links: {e}") from e
return removed
@staticmethod
def remove_orphan_files(directory: Path, keep_names: set[str]) -> int:
"""Remove seq* files and film_temp_* not in the keep set.
Same pattern matching as cleanup_old_links but skips filenames
present in keep_names.
Args:
directory: Directory to clean orphans from.
keep_names: Set of filenames to keep.
Returns:
Number of files removed.
Raises:
CleanupError: If removal fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
try:
for item in directory.iterdir():
if item.name in keep_names:
continue
should_remove = False
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to remove orphan files: {e}") from e
return removed
@staticmethod
def symlink_matches(link_path: Path, expected_source: Path) -> bool:
"""Check if existing symlink resolves to expected source."""
if not link_path.is_symlink():
return False
try:
return link_path.resolve() == expected_source.resolve()
except OSError:
return False
@staticmethod
def copy_matches(dest_path: Path, source_path: Path) -> bool:
"""Check if existing copy matches source.
Fast path: size + mtime comparison. If sizes match but mtimes
differ, falls back to comparing file contents so that a
re-export after touching (but not changing) the source is still
skipped, while a genuine content change is caught.
"""
if not dest_path.is_file() or dest_path.is_symlink():
return False
try:
src_stat = source_path.stat()
dst_stat = dest_path.stat()
if src_stat.st_size != dst_stat.st_size:
return False
# Fast path: identical mtime means the copy2 wrote this file
if abs(src_stat.st_mtime - dst_stat.st_mtime) < 2.0:
return True
# Size matches but mtime differs — compare contents
return SymlinkManager._files_equal(source_path, dest_path)
except OSError:
return False
@staticmethod
def _files_equal(a: Path, b: Path, chunk_size: int = 65536) -> bool:
"""Compare two files by reading in chunks."""
try:
with open(a, 'rb') as fa, open(b, 'rb') as fb:
while True:
ca = fa.read(chunk_size)
cb = fb.read(chunk_size)
if ca != cb:
return False
if not ca:
return True
except OSError:
return False
def create_sequence_links(
self,
sources: list[Path],
dest: Path,
files: list[tuple],
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
copy_files: bool = False,
) -> tuple[list[LinkResult], Optional[int]]:
"""Create sequenced symlinks or copies from source files to destination.
Args:
sources: List of source directories (for validation).
dest: Destination directory.
files: List of tuples. Can be:
- (source_dir, filename) for CLI mode (uses global sequence)
- (source_dir, filename, folder_idx, file_idx) for GUI mode
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
copy_files: If True, copy files instead of creating symlinks.
Returns:
Tuple of (list of LinkResult objects, session_id or None).
"""
self.validate_paths(sources, dest)
session_id = None
if self.db:
session_id = self.db.create_session(str(dest))
# Save trim settings if provided
if trim_settings and session_id:
for folder, (trim_start, trim_end) in trim_settings.items():
if trim_start > 0 or trim_end > 0:
self.db.save_trim_settings(
session_id, str(folder), trim_start, trim_end
)
results: list[LinkResult] = []
# Check if we have folder indices (GUI mode) or not (CLI mode)
use_folder_sequences = len(files) > 0 and len(files[0]) >= 4
# For CLI mode without folder indices, calculate them
if not use_folder_sequences:
folder_to_index = {folder: i for i, folder in enumerate(sources)}
folder_file_counts: dict[Path, int] = {}
expanded_files = []
for source_dir, filename in files:
folder_idx = folder_to_index.get(source_dir, 0)
file_idx = folder_file_counts.get(source_dir, 0)
folder_file_counts[source_dir] = file_idx + 1
expanded_files.append((source_dir, filename, folder_idx, file_idx))
files = expanded_files
# Build planned names for orphan removal
planned_names: set[str] = set()
for file_data in files:
_, fn, fi, fli = file_data
ext = Path(fn).suffix
planned_names.add(f"seq{fi + 1:02d}_{fli:04d}{ext}")
for i, file_data in enumerate(files):
source_dir, filename, folder_idx, file_idx = file_data
source_path = source_dir / filename
ext = source_path.suffix
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
link_path = dest / link_name
try:
# Check if existing file already matches
already_correct = False
if link_path.exists() or link_path.is_symlink():
if copy_files:
already_correct = self.copy_matches(link_path, source_path)
else:
already_correct = self.symlink_matches(link_path, source_path)
if not already_correct:
if link_path.exists() or link_path.is_symlink():
link_path.unlink()
if copy_files:
import shutil
shutil.copy2(source_path, link_path)
else:
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
link_path.symlink_to(rel_source)
if self.db and session_id:
self.db.record_symlink(
session_id=session_id,
source=str(source_path.resolve()),
link=str(link_path),
filename=filename,
seq=i
)
results.append(LinkResult(
source_path=source_path,
link_path=link_path,
sequence_number=i,
success=True
))
except OSError as e:
results.append(LinkResult(
source_path=source_path,
link_path=link_path,
sequence_number=i,
success=False,
error=str(e)
))
# Remove orphan seq*/film_temp_* files not in the planned set
try:
self.remove_orphan_files(dest, planned_names)
except CleanupError:
pass
return results, session_id

View File

@@ -1,194 +0,0 @@
"""Data models, enums, and exceptions for Video Montage Linker."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Optional
# --- Enums ---
class BlendCurve(Enum):
"""Blend curve types for cross-dissolve transitions."""
LINEAR = 'linear'
EASE_IN = 'ease_in'
EASE_OUT = 'ease_out'
EASE_IN_OUT = 'ease_in_out'
class BlendMethod(Enum):
"""Blend method types for transitions."""
ALPHA = 'alpha' # Simple cross-dissolve (PIL.Image.blend)
OPTICAL_FLOW = 'optical' # OpenCV Farneback optical flow
RIFE = 'rife' # AI frame interpolation (NCNN binary)
RIFE_PRACTICAL = 'rife_practical' # Practical-RIFE Python/PyTorch implementation
class FolderType(Enum):
"""Folder type for transition detection."""
AUTO = 'auto'
MAIN = 'main'
TRANSITION = 'transition'
class DirectInterpolationMethod(Enum):
"""Method for direct frame interpolation between sequences."""
RIFE = 'rife'
FILM = 'film'
# --- Data Classes ---
@dataclass
class TransitionSettings:
"""Settings for cross-dissolve transitions."""
enabled: bool = False
blend_curve: BlendCurve = BlendCurve.LINEAR
output_format: str = 'png'
webp_method: int = 4 # 0-6, used when format is webp (compression effort)
output_quality: int = 95 # used for jpeg only
trans_destination: Optional[Path] = None # separate destination for transition output
blend_method: BlendMethod = BlendMethod.ALPHA # blending method
rife_binary_path: Optional[Path] = None # path to rife-ncnn-vulkan binary
rife_model: str = 'rife-v4.6' # RIFE model to use
rife_uhd: bool = False # Enable UHD mode for high resolution
rife_tta: bool = False # Enable TTA mode for better quality
# Practical-RIFE settings
practical_rife_model: str = 'v4.25' # v4.25, v4.26, v4.22, etc.
practical_rife_ensemble: bool = False # Ensemble mode for better quality (slower)
# Optical flow settings
of_preset: str = 'balanced' # fast, balanced, quality, max
of_levels: int = 3 # pyramid levels (1-7)
of_winsize: int = 15 # window size (5-51, odd)
of_iterations: int = 3 # iterations (1-10)
of_poly_n: int = 5 # polynomial neighborhood (5 or 7)
of_poly_sigma: float = 1.2 # gaussian sigma (0.5-2.0)
@dataclass
class PerTransitionSettings:
"""Per-transition overlap settings for cross-dissolves."""
trans_folder: Path
left_overlap: int = 16 # overlap count at left boundary (MAIN→TRANS)
right_overlap: int = 16 # overlap count at right boundary (TRANS→MAIN)
@dataclass
class DirectTransitionSettings:
"""Settings for direct AI interpolation between sequences (no transition folder)."""
after_folder: Path # The folder after which this transition occurs
frame_count: int = 16 # Number of interpolated frames to generate
method: DirectInterpolationMethod = DirectInterpolationMethod.FILM
enabled: bool = True
@dataclass
class VideoPreset:
"""Preset for video encoding via ffmpeg."""
label: str # Display name
container: str # 'mp4' or 'webm'
codec: str # ffmpeg codec: libx264, libx265, libvpx-vp9, libaom-av1
crf: int
pixel_format: str = 'yuv420p'
preset: str = 'medium' # x264/x265 speed preset
max_height: Optional[int] = None # Downscale filter
extra_args: list[str] = field(default_factory=list)
VIDEO_PRESETS: dict[str, VideoPreset] = {
'web_streaming': VideoPreset('Web Streaming', 'mp4', 'libx264', 23, preset='medium'),
'high_quality': VideoPreset('High Quality', 'mp4', 'libx264', 18, preset='slow'),
'archive': VideoPreset('Archive (H.265)', 'mp4', 'libx265', 18, preset='slow', extra_args=['-tag:v', 'hvc1']),
'social_media': VideoPreset('Social Media', 'mp4', 'libx264', 23, preset='fast', max_height=1080),
'fast_preview': VideoPreset('Fast Preview', 'mp4', 'libx264', 28, preset='ultrafast'),
'webm_vp9': VideoPreset('WebM VP9', 'webm', 'libvpx-vp9', 30, extra_args=['-b:v', '0']),
'webm_av1': VideoPreset('WebM AV1', 'webm', 'libaom-av1', 30, extra_args=['-b:v', '0', '-strict', 'experimental']),
'godot_theora': VideoPreset('Godot (Theora)', 'ogv', 'libtheora', 8, extra_args=['-g', '512']),
}
@dataclass
class BlendResult:
"""Result of an image blend operation."""
output_path: Path
source_a: Path
source_b: Path
blend_factor: float
success: bool
error: Optional[str] = None
@dataclass
class TransitionSpec:
"""Specification for a transition boundary between two folders."""
main_folder: Path
trans_folder: Path
main_files: list[str]
trans_files: list[str]
left_overlap: int # asymmetric: frames from main folder end
right_overlap: int # asymmetric: frames from trans folder start
# Indices into the overall file list
main_start_idx: int
trans_start_idx: int
# Position indices in the folders list (for duplicate folder support)
main_folder_idx: int = 0
trans_folder_idx: int = 0
@dataclass
class LinkResult:
"""Result of a symlink creation operation."""
source_path: Path
link_path: Path
sequence_number: int
success: bool
error: Optional[str] = None
@dataclass
class SymlinkRecord:
"""Database record of a created symlink."""
id: int
session_id: int
source_path: str
link_path: str
original_filename: str
sequence_number: int
created_at: datetime
@dataclass
class SessionRecord:
"""Database record of a symlink session."""
id: int
created_at: datetime
destination: str
link_count: int = 0
name: Optional[str] = None
locked: bool = False
# --- Exceptions ---
class SymlinkError(Exception):
"""Base exception for symlink operations."""
class PathValidationError(SymlinkError):
"""Error validating file paths."""
class SourceNotFoundError(PathValidationError):
"""Source directory does not exist."""
class DestinationError(PathValidationError):
"""Error with destination directory."""
class CleanupError(SymlinkError):
"""Error during cleanup of existing symlinks."""
class DatabaseError(SymlinkError):
"""Error with database operations."""

View File

@@ -1,429 +0,0 @@
#!/usr/bin/env python
"""RIFE interpolation worker - runs in isolated venv with PyTorch.
This script is executed via subprocess from the main application.
It handles loading Practical-RIFE models and performing frame interpolation.
Note: The Practical-RIFE models require the IFNet architecture from the
Practical-RIFE repository. This script downloads and uses the model weights
with a simplified inference implementation.
"""
import argparse
import os
import shutil
import sys
import tempfile
import urllib.request
import zipfile
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
def conv(in_planes, out_planes, kernel_size=3, stride=1, padding=1, dilation=1):
return nn.Sequential(
nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride,
padding=padding, dilation=dilation, bias=True),
nn.PReLU(out_planes)
)
def deconv(in_planes, out_planes, kernel_size=4, stride=2, padding=1):
return nn.Sequential(
nn.ConvTranspose2d(in_planes, out_planes, kernel_size, stride, padding, bias=True),
nn.PReLU(out_planes)
)
class IFBlock(nn.Module):
def __init__(self, in_planes, c=64):
super(IFBlock, self).__init__()
self.conv0 = nn.Sequential(
conv(in_planes, c//2, 3, 2, 1),
conv(c//2, c, 3, 2, 1),
)
self.convblock = nn.Sequential(
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
)
self.lastconv = nn.ConvTranspose2d(c, 5, 4, 2, 1)
def forward(self, x, flow=None, scale=1):
x = F.interpolate(x, scale_factor=1./scale, mode="bilinear", align_corners=False)
if flow is not None:
flow = F.interpolate(flow, scale_factor=1./scale, mode="bilinear", align_corners=False) / scale
x = torch.cat((x, flow), 1)
feat = self.conv0(x)
feat = self.convblock(feat) + feat
tmp = self.lastconv(feat)
tmp = F.interpolate(tmp, scale_factor=scale*2, mode="bilinear", align_corners=False)
flow = tmp[:, :4] * scale * 2
mask = tmp[:, 4:5]
return flow, mask
def warp(tenInput, tenFlow):
k = (str(tenFlow.device), str(tenFlow.size()))
backwarp_tenGrid = {}
if k not in backwarp_tenGrid:
tenHorizontal = torch.linspace(-1.0, 1.0, tenFlow.shape[3], device=tenFlow.device).view(
1, 1, 1, tenFlow.shape[3]).expand(tenFlow.shape[0], -1, tenFlow.shape[2], -1)
tenVertical = torch.linspace(-1.0, 1.0, tenFlow.shape[2], device=tenFlow.device).view(
1, 1, tenFlow.shape[2], 1).expand(tenFlow.shape[0], -1, -1, tenFlow.shape[3])
backwarp_tenGrid[k] = torch.cat([tenHorizontal, tenVertical], 1)
tenFlow = torch.cat([tenFlow[:, 0:1, :, :] / ((tenInput.shape[3] - 1.0) / 2.0),
tenFlow[:, 1:2, :, :] / ((tenInput.shape[2] - 1.0) / 2.0)], 1)
g = (backwarp_tenGrid[k] + tenFlow).permute(0, 2, 3, 1)
return F.grid_sample(input=tenInput, grid=g, mode='bilinear', padding_mode='border', align_corners=True)
class IFNet(nn.Module):
"""IFNet architecture for Practical-RIFE v4.25/v4.26 models."""
def __init__(self):
super(IFNet, self).__init__()
# v4.25/v4.26 architecture:
# block0 input: img0(3) + img1(3) + f0(4) + f1(4) + timestep(1) = 15
# block1+ input: img0(3) + img1(3) + wf0(4) + wf1(4) + f0(4) + f1(4) + timestep(1) + mask(1) + flow(4) = 28
self.block0 = IFBlock(3+3+4+4+1, c=192)
self.block1 = IFBlock(3+3+4+4+4+4+1+1+4, c=128)
self.block2 = IFBlock(3+3+4+4+4+4+1+1+4, c=96)
self.block3 = IFBlock(3+3+4+4+4+4+1+1+4, c=64)
# Encode produces 4-channel features
self.encode = nn.Sequential(
nn.Conv2d(3, 32, 3, 2, 1),
nn.LeakyReLU(0.2, True),
nn.Conv2d(32, 32, 3, 1, 1),
nn.LeakyReLU(0.2, True),
nn.Conv2d(32, 32, 3, 1, 1),
nn.LeakyReLU(0.2, True),
nn.ConvTranspose2d(32, 4, 4, 2, 1)
)
def forward(self, img0, img1, timestep=0.5, scale_list=[8, 4, 2, 1]):
f0 = self.encode(img0[:, :3])
f1 = self.encode(img1[:, :3])
warped_img0 = img0
warped_img1 = img1
flow = None
mask = None
block = [self.block0, self.block1, self.block2, self.block3]
for i in range(4):
if flow is None:
flow, mask = block[i](
torch.cat((img0[:, :3], img1[:, :3], f0, f1, timestep), 1),
None, scale=scale_list[i])
else:
wf0 = warp(f0, flow[:, :2])
wf1 = warp(f1, flow[:, 2:4])
fd, m0 = block[i](
torch.cat((warped_img0[:, :3], warped_img1[:, :3], wf0, wf1, f0, f1, timestep, mask), 1),
flow, scale=scale_list[i])
flow = flow + fd
mask = mask + m0
warped_img0 = warp(img0, flow[:, :2])
warped_img1 = warp(img1, flow[:, 2:4])
mask_final = torch.sigmoid(mask)
merged_final = warped_img0 * mask_final + warped_img1 * (1 - mask_final)
return merged_final
# Model URLs for downloading (Google Drive direct download links)
# File IDs extracted from official Practical-RIFE repository
MODEL_URLS = {
'v4.26': 'https://drive.google.com/uc?export=download&id=1gViYvvQrtETBgU1w8axZSsr7YUuw31uy',
'v4.25': 'https://drive.google.com/uc?export=download&id=1ZKjcbmt1hypiFprJPIKW0Tt0lr_2i7bg',
'v4.22': 'https://drive.google.com/uc?export=download&id=1qh2DSA9a1eZUTtZG9U9RQKO7N7OaUJ0_',
'v4.20': 'https://drive.google.com/uc?export=download&id=11n3YR7-qCRZm9RDdwtqOTsgCJUHPuexA',
'v4.18': 'https://drive.google.com/uc?export=download&id=1octn-UVuEjXa_HlsIUbNeLTTvYCKbC_s',
'v4.15': 'https://drive.google.com/uc?export=download&id=1xlem7cfKoMaiLzjoeum8KIQTYO-9iqG5',
}
def download_model(version: str, model_dir: Path) -> Path:
"""Download model if not already cached.
Google Drive links distribute zip files containing the model.
This function downloads and extracts the flownet.pkl file.
Args:
version: Model version (e.g., 'v4.25').
model_dir: Directory to store models.
Returns:
Path to the downloaded model file.
"""
model_dir.mkdir(parents=True, exist_ok=True)
model_path = model_dir / f'flownet_{version}.pkl'
if model_path.exists():
# Verify it's not a zip file (from previous failed attempt)
with open(model_path, 'rb') as f:
header = f.read(4)
if header == b'PK\x03\x04': # ZIP magic number
print(f"Removing corrupted zip file at {model_path}", file=sys.stderr)
model_path.unlink()
else:
return model_path
url = MODEL_URLS.get(version)
if not url:
raise ValueError(f"Unknown model version: {version}")
print(f"Downloading RIFE model {version}...", file=sys.stderr)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir) / 'download'
# Try using gdown for Google Drive (handles confirmations automatically)
downloaded = False
try:
import gdown
file_id = url.split('id=')[1] if 'id=' in url else None
if file_id:
gdown_url = f'https://drive.google.com/uc?id={file_id}'
gdown.download(gdown_url, str(tmp_path), quiet=False)
downloaded = tmp_path.exists()
except ImportError:
print("gdown not available, trying direct download...", file=sys.stderr)
except Exception as e:
print(f"gdown failed: {e}, trying direct download...", file=sys.stderr)
# Fallback: direct download
if not downloaded:
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=300) as response:
data = response.read()
if data[:100].startswith(b'<!') or b'<html' in data[:500].lower():
raise RuntimeError("Google Drive returned HTML - install gdown: pip install gdown")
with open(tmp_path, 'wb') as f:
f.write(data)
downloaded = True
except Exception as e:
raise RuntimeError(f"Failed to download model: {e}")
if not downloaded or not tmp_path.exists():
raise RuntimeError("Download failed - no file received")
# Check if downloaded file is a zip archive
with open(tmp_path, 'rb') as f:
header = f.read(4)
if header == b'PK\x03\x04': # ZIP magic number
print(f"Extracting model from zip archive...", file=sys.stderr)
with zipfile.ZipFile(tmp_path, 'r') as zf:
# Find flownet.pkl in the archive
pkl_files = [n for n in zf.namelist() if n.endswith('flownet.pkl')]
if not pkl_files:
raise RuntimeError(f"No flownet.pkl found in zip. Contents: {zf.namelist()}")
# Extract the pkl file
pkl_name = pkl_files[0]
with zf.open(pkl_name) as src, open(model_path, 'wb') as dst:
dst.write(src.read())
else:
# Already a pkl file, just move it
shutil.move(str(tmp_path), str(model_path))
print(f"Model saved to {model_path}", file=sys.stderr)
return model_path
def load_model(model_path: Path, device: torch.device) -> IFNet:
"""Load IFNet model from state dict.
Args:
model_path: Path to flownet.pkl file.
device: Device to load model to.
Returns:
Loaded IFNet model.
"""
model = IFNet()
state_dict = torch.load(model_path, map_location='cpu')
# Handle different state dict formats
if 'state_dict' in state_dict:
state_dict = state_dict['state_dict']
# Remove 'module.' prefix if present (from DataParallel)
new_state_dict = {}
for k, v in state_dict.items():
if k.startswith('module.'):
k = k[7:]
# Handle flownet. prefix
if k.startswith('flownet.'):
k = k[8:]
new_state_dict[k] = v
model.load_state_dict(new_state_dict, strict=False)
model.to(device)
model.eval()
return model
def pad_image(img: torch.Tensor, padding: int = 64) -> tuple:
"""Pad image to be divisible by padding.
Args:
img: Input tensor (B, C, H, W).
padding: Padding divisor.
Returns:
Tuple of (padded image, (original H, original W)).
"""
_, _, h, w = img.shape
ph = ((h - 1) // padding + 1) * padding
pw = ((w - 1) // padding + 1) * padding
pad_h = ph - h
pad_w = pw - w
padded = F.pad(img, (0, pad_w, 0, pad_h), mode='replicate')
return padded, (h, w)
@torch.no_grad()
def inference(model: IFNet, img0: torch.Tensor, img1: torch.Tensor,
timestep: float = 0.5, ensemble: bool = False) -> torch.Tensor:
"""Perform frame interpolation.
Args:
model: Loaded IFNet model.
img0: First frame tensor (B, C, H, W) normalized to [0, 1].
img1: Second frame tensor (B, C, H, W) normalized to [0, 1].
timestep: Interpolation timestep (0.0 to 1.0).
ensemble: Enable ensemble mode for better quality.
Returns:
Interpolated frame tensor.
"""
# Pad images
img0_padded, orig_size = pad_image(img0)
img1_padded, _ = pad_image(img1)
h, w = orig_size
# Create timestep tensor
timestep_tensor = torch.full((1, 1, img0_padded.shape[2], img0_padded.shape[3]),
timestep, device=img0.device)
if ensemble:
# Ensemble: average of forward and reverse
result1 = model(img0_padded, img1_padded, timestep_tensor)
result2 = model(img1_padded, img0_padded, 1 - timestep_tensor)
result = (result1 + result2) / 2
else:
result = model(img0_padded, img1_padded, timestep_tensor)
# Crop back to original size
result = result[:, :, :h, :w]
return result.clamp(0, 1)
def load_image(path: Path, device: torch.device) -> torch.Tensor:
"""Load image as tensor.
Args:
path: Path to image file.
device: Device to load tensor to.
Returns:
Image tensor (1, 3, H, W) normalized to [0, 1].
"""
img = Image.open(path).convert('RGB')
arr = np.array(img).astype(np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
return tensor.to(device)
def save_image(tensor: torch.Tensor, path: Path) -> None:
"""Save tensor as image.
Args:
tensor: Image tensor (1, 3, H, W) normalized to [0, 1].
path: Output path.
"""
arr = tensor.squeeze(0).permute(1, 2, 0).cpu().numpy()
arr = (arr * 255).clip(0, 255).astype(np.uint8)
Image.fromarray(arr).save(path)
# Global model cache
_model_cache: dict = {}
def get_model(version: str, model_dir: Path, device: torch.device) -> IFNet:
"""Get or load model (cached).
Args:
version: Model version.
model_dir: Model cache directory.
device: Device to run on.
Returns:
IFNet model instance.
"""
cache_key = f"{version}_{device}"
if cache_key not in _model_cache:
model_path = download_model(version, model_dir)
_model_cache[cache_key] = load_model(model_path, device)
return _model_cache[cache_key]
def main():
parser = argparse.ArgumentParser(description='RIFE frame interpolation worker')
parser.add_argument('--input0', required=True, help='Path to first input image')
parser.add_argument('--input1', required=True, help='Path to second input image')
parser.add_argument('--output', required=True, help='Path to output image')
parser.add_argument('--timestep', type=float, default=0.5, help='Interpolation timestep (0-1)')
parser.add_argument('--model', default='v4.25', help='Model version')
parser.add_argument('--model-dir', required=True, help='Model cache directory')
parser.add_argument('--ensemble', action='store_true', help='Enable ensemble mode')
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
args = parser.parse_args()
try:
# Select device
if args.device == 'cuda' and torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load model
model_dir = Path(args.model_dir)
model = get_model(args.model, model_dir, device)
# Load images
img0 = load_image(Path(args.input0), device)
img1 = load_image(Path(args.input1), device)
# Interpolate
result = inference(model, img0, img1, args.timestep, args.ensemble)
# Save result
save_image(result, Path(args.output))
print("Success", file=sys.stderr)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,259 +0,0 @@
"""Video encoding utilities wrapping ffmpeg."""
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Callable, Optional
from .models import VideoPreset
def find_ffmpeg() -> Optional[Path]:
"""Find the ffmpeg binary on the system PATH."""
result = shutil.which('ffmpeg')
return Path(result) if result else None
def encode_image_sequence(
input_dir: Path,
output_path: Path,
fps: int,
preset: VideoPreset,
input_pattern: Optional[str] = None,
progress_callback: Optional[Callable[[int, int], bool]] = None,
total_frames: Optional[int] = None,
) -> tuple[bool, str]:
"""Encode an image sequence directory to a video file using ffmpeg.
Args:
input_dir: Directory containing sequentially named image files.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
input_pattern: ffmpeg input pattern (e.g. 'seq_%06d.png').
Auto-detected from first seq_* file if not provided.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
total_frames: Total number of frames for progress reporting.
Auto-counted from input_dir if not provided.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
# Auto-detect input pattern from first seq_* file
if input_pattern is None:
input_pattern = _detect_input_pattern(input_dir)
if input_pattern is None:
return False, f"No seq_* image files found in {input_dir}"
# Auto-count frames
if total_frames is None:
ext = Path(input_pattern).suffix
total_frames = len(list(input_dir.glob(f"seq_*{ext}")))
if total_frames == 0:
return False, f"No matching frames found in {input_dir}"
# Build ffmpeg command
cmd = [
str(ffmpeg), '-y',
'-framerate', str(fps),
'-i', str(input_dir / input_pattern),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
# Add speed preset for x264/x265
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
# Add downscale filter if max_height is set
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
# Add any extra codec-specific args
if preset.extra_args:
cmd += preset.extra_args
# Progress parsing via -progress pipe:1
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
# Clean up partial file
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
def _detect_input_pattern(input_dir: Path) -> Optional[str]:
"""Detect the ffmpeg input pattern from seq_* files in a directory.
Looks for files like seq_000000.png and returns a pattern like seq_%06d.png.
"""
for f in sorted(input_dir.iterdir()):
m = re.match(r'^(seq_)(\d+)(\.\w+)$', f.name)
if m:
prefix = m.group(1)
digits = m.group(2)
ext = m.group(3)
width = len(digits)
return f"{prefix}%0{width}d{ext}"
return None
def encode_from_file_list(
file_paths: list[Path],
output_path: Path,
fps: int,
preset: VideoPreset,
progress_callback: Optional[Callable[[int, int], bool]] = None,
) -> tuple[bool, str]:
"""Encode a video from an explicit list of image file paths.
Uses ffmpeg's concat demuxer so files can be scattered across directories.
Args:
file_paths: Ordered list of image file paths.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
if not file_paths:
return False, "No files provided."
total_frames = len(file_paths)
frame_duration = f"{1.0 / fps:.10f}"
# Write a concat-demuxer file listing each image with its duration
try:
concat_file = tempfile.NamedTemporaryFile(
mode='w', suffix='.txt', delete=False, prefix='vml_concat_'
)
concat_path = Path(concat_file.name)
for p in file_paths:
# Escape single quotes for ffmpeg concat format
escaped = str(p.resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.write(f"duration {frame_duration}\n")
# Repeat last file so the last frame displays for its full duration
escaped = str(file_paths[-1].resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.close()
except OSError as e:
return False, f"Failed to create concat file: {e}"
cmd = [
str(ffmpeg), '-y',
'-f', 'concat', '-safe', '0',
'-i', str(concat_path),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
if preset.extra_args:
cmd += preset.extra_args
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
finally:
try:
concat_path.unlink(missing_ok=True)
except OSError:
pass

View File

@@ -0,0 +1,50 @@
# Gitea Push Mirror Setup Instructions
## Prerequisites
- Gitea API token
- GitHub Personal Access Token (PAT) with `repo` scope
## Tokens
| Service | Token |
|---------|-------|
| Gitea | `1b99442d046cdfa2f5d76322a6081131a0561c53` |
| GitHub | `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` |
## API Endpoint
```
POST http://192.168.1.1:3000/api/v1/repos/{owner}/{repo}/push_mirrors
```
## Command Template
```bash
curl -X POST "http://192.168.1.1:3000/api/v1/repos/Ethanfel/{REPO_NAME}/push_mirrors" \
-H "Authorization: token 1b99442d046cdfa2f5d76322a6081131a0561c53" \
-H "Content-Type: application/json" \
-d '{
"remote_address": "https://github.com/Ethanfel/{REPO_NAME}.git",
"remote_username": "Ethanfel",
"remote_password": "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",
"interval": "8h0m0s",
"sync_on_commit": true
}'
```
Replace `{REPO_NAME}` with the actual repository name.
## Verify Mirror
```bash
curl "http://192.168.1.1:3000/api/v1/repos/Ethanfel/{REPO_NAME}/push_mirrors" \
-H "Authorization: token 1b99442d046cdfa2f5d76322a6081131a0561c53"
```
## Notes
- GitHub repo must exist before creating the mirror
- Gitea runs on port 3000 (not 443)
- `sync_on_commit: true` pushes to GitHub on every commit
- `interval: 8h0m0s` syncs every 8 hours regardless of commits

1936
symlink.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +0,0 @@
"""UI modules for Video Montage Linker."""
from .widgets import TrimSlider
from .main_window import SequenceLinkerUI
__all__ = [
'TrimSlider',
'SequenceLinkerUI',
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,298 +0,0 @@
"""Custom widgets for Video Montage Linker UI."""
from typing import Optional
from PyQt6.QtCore import Qt, pyqtSignal, QRect
from PyQt6.QtGui import QPainter, QColor, QBrush, QPen, QMouseEvent
from PyQt6.QtWidgets import QWidget
class TrimSlider(QWidget):
"""A slider widget with two draggable handles for trimming sequences.
Allows setting in/out points for a sequence by dragging left and right handles.
Gray areas indicate trimmed regions, colored area indicates included images.
"""
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
trimDragFinished = pyqtSignal(int, int, str) # Emits final values on mouse release
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the trim slider.
Args:
parent: Parent widget.
"""
super().__init__(parent)
self._total = 0
self._trim_start = 0
self._trim_end = 0
self._current_pos = 0
self._dragging: Optional[str] = None # 'left', 'right', or None
self._handle_width = 10
self._track_height = 20
self._enabled = True
self.setMinimumHeight(40)
self.setMinimumWidth(100)
self.setCursor(Qt.CursorShape.ArrowCursor)
self.setMouseTracking(True)
def setRange(self, total: int) -> None:
"""Set the total number of items in the sequence.
Args:
total: Total number of items.
"""
self._total = max(0, total)
# Clamp trim values to valid range
self._trim_start = min(self._trim_start, max(0, self._total - 1))
self._trim_end = min(self._trim_end, max(0, self._total - 1 - self._trim_start))
self.update()
def setTrimStart(self, value: int) -> None:
"""Set the trim start value.
Args:
value: Number of items to trim from start.
"""
max_start = max(0, self._total - 1 - self._trim_end)
self._trim_start = max(0, min(value, max_start))
self.update()
def setTrimEnd(self, value: int) -> None:
"""Set the trim end value.
Args:
value: Number of items to trim from end.
"""
max_end = max(0, self._total - 1 - self._trim_start)
self._trim_end = max(0, min(value, max_end))
self.update()
def setCurrentPosition(self, pos: int) -> None:
"""Set the current position indicator.
Args:
pos: Current position index.
"""
self._current_pos = max(0, min(pos, self._total - 1)) if self._total > 0 else 0
self.update()
def trimStart(self) -> int:
"""Get the trim start value."""
return self._trim_start
def trimEnd(self) -> int:
"""Get the trim end value."""
return self._trim_end
def total(self) -> int:
"""Get the total number of items."""
return self._total
def includedRange(self) -> tuple[int, int]:
"""Get the range of included items (after trimming).
Returns:
Tuple of (first_included_index, last_included_index).
Returns (-1, -1) if no items are included.
"""
if self._total == 0:
return (-1, -1)
first = self._trim_start
last = self._total - 1 - self._trim_end
if first > last:
return (-1, -1)
return (first, last)
def setEnabled(self, enabled: bool) -> None:
"""Enable or disable the widget."""
self._enabled = enabled
self.update()
def _track_rect(self) -> QRect:
"""Get the rectangle for the slider track."""
margin = self._handle_width
return QRect(
margin,
(self.height() - self._track_height) // 2,
self.width() - 2 * margin,
self._track_height
)
def _value_to_x(self, value: int) -> int:
"""Convert a value to an x coordinate."""
track = self._track_rect()
if self._total <= 1:
return track.left()
ratio = value / (self._total - 1)
return int(track.left() + ratio * track.width())
def _x_to_value(self, x: int) -> int:
"""Convert an x coordinate to a value."""
track = self._track_rect()
if track.width() == 0 or self._total <= 1:
return 0
ratio = (x - track.left()) / track.width()
ratio = max(0.0, min(1.0, ratio))
return int(round(ratio * (self._total - 1)))
def _left_handle_rect(self) -> QRect:
"""Get the rectangle for the left (trim start) handle."""
x = self._value_to_x(self._trim_start)
return QRect(
x - self._handle_width // 2,
(self.height() - self._track_height - 10) // 2,
self._handle_width,
self._track_height + 10
)
def _right_handle_rect(self) -> QRect:
"""Get the rectangle for the right (trim end) handle."""
x = self._value_to_x(self._total - 1 - self._trim_end) if self._total > 0 else 0
return QRect(
x - self._handle_width // 2,
(self.height() - self._track_height - 10) // 2,
self._handle_width,
self._track_height + 10
)
def paintEvent(self, event) -> None:
"""Paint the trim slider."""
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
track = self._track_rect()
# Colors
bg_color = QColor(60, 60, 60)
trimmed_color = QColor(80, 80, 80)
included_color = QColor(52, 152, 219) if self._enabled else QColor(100, 100, 100)
handle_color = QColor(200, 200, 200) if self._enabled else QColor(120, 120, 120)
position_color = QColor(255, 255, 255)
# Draw background track
painter.fillRect(track, bg_color)
if self._total > 0:
# Draw trimmed regions (darker)
left_trim_x = self._value_to_x(self._trim_start)
right_trim_x = self._value_to_x(self._total - 1 - self._trim_end)
# Left trimmed region
if self._trim_start > 0:
left_rect = QRect(track.left(), track.top(),
left_trim_x - track.left(), track.height())
painter.fillRect(left_rect, trimmed_color)
# Right trimmed region
if self._trim_end > 0:
right_rect = QRect(right_trim_x, track.top(),
track.right() - right_trim_x, track.height())
painter.fillRect(right_rect, trimmed_color)
# Draw included region
if left_trim_x < right_trim_x:
included_rect = QRect(left_trim_x, track.top(),
right_trim_x - left_trim_x, track.height())
painter.fillRect(included_rect, included_color)
# Draw current position indicator
if self._trim_start <= self._current_pos <= (self._total - 1 - self._trim_end):
pos_x = self._value_to_x(self._current_pos)
painter.setPen(QPen(position_color, 2))
painter.drawLine(pos_x, track.top() - 2, pos_x, track.bottom() + 2)
# Draw handles
painter.setBrush(QBrush(handle_color))
painter.setPen(QPen(Qt.GlobalColor.black, 1))
# Left handle
left_handle = self._left_handle_rect()
painter.drawRect(left_handle)
# Right handle
right_handle = self._right_handle_rect()
painter.drawRect(right_handle)
painter.end()
def mousePressEvent(self, event: QMouseEvent) -> None:
"""Handle mouse press to start dragging handles."""
if not self._enabled or self._total == 0:
return
pos = event.pos()
# Check if clicking on handles (check right first since it may overlap)
right_rect = self._right_handle_rect()
left_rect = self._left_handle_rect()
# Expand hit area slightly for easier grabbing
expand = 5
left_expanded = left_rect.adjusted(-expand, -expand, expand, expand)
right_expanded = right_rect.adjusted(-expand, -expand, expand, expand)
if right_expanded.contains(pos):
self._dragging = 'right'
elif left_expanded.contains(pos):
self._dragging = 'left'
else:
self._dragging = None
def mouseMoveEvent(self, event: QMouseEvent) -> None:
"""Handle mouse move to drag handles."""
if not self._enabled:
return
pos = event.pos()
# Update cursor based on position
if self._dragging:
self.setCursor(Qt.CursorShape.SizeHorCursor)
else:
left_rect = self._left_handle_rect()
right_rect = self._right_handle_rect()
expand = 5
left_expanded = left_rect.adjusted(-expand, -expand, expand, expand)
right_expanded = right_rect.adjusted(-expand, -expand, expand, expand)
if left_expanded.contains(pos) or right_expanded.contains(pos):
self.setCursor(Qt.CursorShape.SizeHorCursor)
else:
self.setCursor(Qt.CursorShape.ArrowCursor)
if self._dragging and self._total > 0:
value = self._x_to_value(pos.x())
if self._dragging == 'left':
# Left handle: set trim_start, clamped to not exceed right
max_start = self._total - 1 - self._trim_end
new_start = max(0, min(value, max_start))
if new_start != self._trim_start:
self._trim_start = new_start
self.update()
self.trimChanged.emit(self._trim_start, self._trim_end, 'left')
elif self._dragging == 'right':
# Right handle: set trim_end based on position
# value is the index position, trim_end is count from end
max_val = self._total - 1 - self._trim_start
clamped_value = max(self._trim_start, min(value, self._total - 1))
new_end = self._total - 1 - clamped_value
if new_end != self._trim_end:
self._trim_end = max(0, new_end)
self.update()
self.trimChanged.emit(self._trim_start, self._trim_end, 'right')
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
"""Handle mouse release to stop dragging."""
if self._dragging:
handle = self._dragging
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
self.trimDragFinished.emit(self._trim_start, self._trim_end, handle)
else:
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)