Restructure into multi-file architecture

Split monolithic symlink.py into modular components:
- config.py: Constants and configuration
- core/: Models, database, blender, manager
- ui/: Main window and widgets

New features included:
- Cross-dissolve transitions with multiple blend methods
- Alpha blend, Optical Flow, and RIFE (AI) interpolation
- Per-folder trim settings with start/end frame control
- Per-transition asymmetric overlap settings
- Folder type overrides (Main/Transition)
- Dual destination folders (sequence + transitions)
- WebP lossless output with compression method setting
- Video and image sequence preview with zoom/pan
- Session resume from destination folder

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-03 18:49:51 +01:00
parent 99858bcfe8
commit bdddce910c
9 changed files with 2248 additions and 1927 deletions

10
config.py Normal file
View File

@@ -0,0 +1,10 @@
"""Configuration constants for Video Montage Linker."""
from pathlib import Path
# Supported file extensions
SUPPORTED_EXTENSIONS = ('.png', '.webp', '.jpg', '.jpeg')
VIDEO_EXTENSIONS = ('.mp4', '.webm', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.m4v')
# Database path
DB_PATH = Path.home() / '.config' / 'video-montage-linker' / 'symlinks.db'

47
core/__init__.py Normal file
View File

@@ -0,0 +1,47 @@
"""Core modules for Video Montage Linker."""
from .models import (
BlendCurve,
BlendMethod,
FolderType,
TransitionSettings,
PerTransitionSettings,
BlendResult,
TransitionSpec,
LinkResult,
SymlinkRecord,
SessionRecord,
SymlinkError,
PathValidationError,
SourceNotFoundError,
DestinationError,
CleanupError,
DatabaseError,
)
from .database import DatabaseManager
from .blender import ImageBlender, TransitionGenerator, RifeDownloader
from .manager import SymlinkManager
__all__ = [
'BlendCurve',
'BlendMethod',
'FolderType',
'TransitionSettings',
'PerTransitionSettings',
'BlendResult',
'TransitionSpec',
'LinkResult',
'SymlinkRecord',
'SessionRecord',
'SymlinkError',
'PathValidationError',
'SourceNotFoundError',
'DestinationError',
'CleanupError',
'DatabaseError',
'DatabaseManager',
'ImageBlender',
'TransitionGenerator',
'RifeDownloader',
'SymlinkManager',
]

925
core/blender.py Normal file
View File

@@ -0,0 +1,925 @@
"""Image blending and transition generation for Video Montage Linker."""
import json
import os
import platform
import shutil
import subprocess
import sys
import tempfile
import urllib.request
import zipfile
from pathlib import Path
from typing import Optional
import numpy as np
from PIL import Image
from .models import (
BlendCurve,
BlendMethod,
FolderType,
TransitionSettings,
PerTransitionSettings,
BlendResult,
TransitionSpec,
)
# Cache directory for downloaded binaries
CACHE_DIR = Path.home() / '.cache' / 'video-montage-linker'
RIFE_GITHUB_API = 'https://api.github.com/repos/nihui/rife-ncnn-vulkan/releases/latest'
class RifeDownloader:
"""Handles automatic download and caching of rife-ncnn-vulkan binary."""
@staticmethod
def get_cache_dir() -> Path:
"""Get the cache directory, creating it if needed."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
return CACHE_DIR
@staticmethod
def get_platform_identifier() -> Optional[str]:
"""Get the platform identifier for downloading the correct binary.
Returns:
Platform string like 'ubuntu', 'windows', 'macos', or None if unsupported.
"""
system = platform.system().lower()
if system == 'linux':
return 'ubuntu'
elif system == 'windows':
return 'windows'
elif system == 'darwin':
return 'macos'
return None
@staticmethod
def get_cached_binary() -> Optional[Path]:
"""Get the path to a cached RIFE binary if it exists.
Returns:
Path to the binary, or None if not cached.
"""
cache_dir = RifeDownloader.get_cache_dir()
rife_dir = cache_dir / 'rife-ncnn-vulkan'
if not rife_dir.exists():
return None
# Look for the binary
system = platform.system().lower()
if system == 'windows':
binary_name = 'rife-ncnn-vulkan.exe'
else:
binary_name = 'rife-ncnn-vulkan'
binary_path = rife_dir / binary_name
if binary_path.exists():
# Ensure it's executable on Unix
if system != 'windows':
binary_path.chmod(0o755)
return binary_path
return None
@staticmethod
def get_latest_release_info() -> Optional[dict]:
"""Fetch the latest release info from GitHub.
Returns:
Dict with 'tag_name' and 'assets' list, or None on error.
"""
try:
req = urllib.request.Request(
RIFE_GITHUB_API,
headers={'User-Agent': 'video-montage-linker'}
)
with urllib.request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode('utf-8'))
except Exception:
return None
@staticmethod
def find_asset_url(release_info: dict, platform_id: str) -> Optional[str]:
"""Find the download URL for the platform-specific asset.
Args:
release_info: Release info dict from GitHub API.
platform_id: Platform identifier (ubuntu, windows, macos).
Returns:
Download URL or None if not found.
"""
assets = release_info.get('assets', [])
for asset in assets:
name = asset.get('name', '').lower()
# Match patterns like rife-ncnn-vulkan-20221029-ubuntu.zip
if platform_id in name and name.endswith('.zip'):
return asset.get('browser_download_url')
return None
@staticmethod
def download_and_extract(url: str, progress_callback=None, cancelled_check=None) -> Optional[Path]:
"""Download and extract the RIFE binary.
Args:
url: URL to download from.
progress_callback: Optional callback(downloaded, total) for progress.
cancelled_check: Optional callable that returns True if cancelled.
Returns:
Path to the extracted binary, or None on error/cancel.
"""
cache_dir = RifeDownloader.get_cache_dir()
rife_dir = cache_dir / 'rife-ncnn-vulkan'
try:
# Download to temp file
req = urllib.request.Request(
url,
headers={'User-Agent': 'video-montage-linker'}
)
with urllib.request.urlopen(req, timeout=300) as response:
total_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
chunk_size = 8192
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
tmp_path = Path(tmp.name)
while True:
# Check for cancellation
if cancelled_check and cancelled_check():
tmp_path.unlink(missing_ok=True)
return None
chunk = response.read(chunk_size)
if not chunk:
break
tmp.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
# Remove old installation if exists
if rife_dir.exists():
shutil.rmtree(rife_dir)
# Extract
with zipfile.ZipFile(tmp_path, 'r') as zf:
# Find the root directory in the zip
names = zf.namelist()
if names:
# Most zips have a root folder like rife-ncnn-vulkan-20221029-ubuntu/
root_in_zip = names[0].split('/')[0]
# Extract to temp location
extract_tmp = cache_dir / 'extract_tmp'
if extract_tmp.exists():
shutil.rmtree(extract_tmp)
zf.extractall(extract_tmp)
# Move the extracted folder to final location
extracted_dir = extract_tmp / root_in_zip
if extracted_dir.exists():
shutil.move(str(extracted_dir), str(rife_dir))
# Cleanup
if extract_tmp.exists():
shutil.rmtree(extract_tmp)
# Cleanup temp zip
tmp_path.unlink(missing_ok=True)
# Return the binary path
return RifeDownloader.get_cached_binary()
except Exception as e:
# Cleanup on error
try:
if 'tmp_path' in locals():
tmp_path.unlink(missing_ok=True)
except Exception:
pass
return None
@staticmethod
def ensure_binary(progress_callback=None) -> Optional[Path]:
"""Ensure RIFE binary is available, downloading if needed.
Args:
progress_callback: Optional callback(downloaded, total) for progress.
Returns:
Path to the binary, or None if unavailable.
"""
# Check if already cached
cached = RifeDownloader.get_cached_binary()
if cached:
return cached
# Check system PATH
system_binary = shutil.which('rife-ncnn-vulkan')
if system_binary:
return Path(system_binary)
# Need to download
platform_id = RifeDownloader.get_platform_identifier()
if not platform_id:
return None
release_info = RifeDownloader.get_latest_release_info()
if not release_info:
return None
asset_url = RifeDownloader.find_asset_url(release_info, platform_id)
if not asset_url:
return None
return RifeDownloader.download_and_extract(asset_url, progress_callback)
@staticmethod
def get_version_info() -> Optional[str]:
"""Get the version of the cached binary.
Returns:
Version string or None.
"""
binary = RifeDownloader.get_cached_binary()
if not binary:
return None
# The version is typically in the parent directory name
# e.g., rife-ncnn-vulkan-20221029-ubuntu
try:
result = subprocess.run(
[str(binary), '-h'],
capture_output=True,
text=True,
timeout=5
)
# Parse version from help output if available
return "installed"
except Exception:
return None
class ImageBlender:
"""Handles image blending operations for cross-dissolve transitions."""
@staticmethod
def calculate_blend_factor(frame_idx: int, total: int, curve: BlendCurve) -> float:
"""Calculate blend factor based on curve type.
Args:
frame_idx: Current frame index within the overlap (0 to total-1).
total: Total number of overlap frames.
curve: The blend curve type.
Returns:
Blend factor from 0.0 (100% image A) to 1.0 (100% image B).
"""
if total <= 1:
return 1.0
t = frame_idx / (total - 1)
if curve == BlendCurve.LINEAR:
return t
elif curve == BlendCurve.EASE_IN:
return t * t
elif curve == BlendCurve.EASE_OUT:
return 1 - (1 - t) ** 2
elif curve == BlendCurve.EASE_IN_OUT:
# Smooth S-curve using smoothstep
return t * t * (3 - 2 * t)
else:
return t
@staticmethod
def interpolate_frame(frames: list, position: float) -> Image.Image:
"""Get an interpolated frame at a fractional position.
When position is fractional, blends between adjacent frames.
Args:
frames: List of PIL Image objects.
position: Position in the frame list (can be fractional).
Returns:
The interpolated PIL Image.
"""
if len(frames) == 1:
return frames[0]
# Clamp position to valid range
position = max(0, min(position, len(frames) - 1))
lower_idx = int(position)
upper_idx = min(lower_idx + 1, len(frames) - 1)
if lower_idx == upper_idx:
return frames[lower_idx]
# Fractional part determines blend
frac = position - lower_idx
return Image.blend(frames[lower_idx], frames[upper_idx], frac)
@staticmethod
def optical_flow_blend(img_a: Image.Image, img_b: Image.Image, t: float) -> Image.Image:
"""Blend using OpenCV optical flow for motion compensation.
Uses Farneback dense optical flow to warp frames and reduce ghosting
artifacts compared to simple alpha blending.
Args:
img_a: First PIL Image (source frame).
img_b: Second PIL Image (target frame).
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
Returns:
Motion-compensated blended PIL Image.
"""
try:
import cv2
except ImportError:
# Fall back to alpha blend if OpenCV not available
return Image.blend(img_a, img_b, t)
# Convert PIL to numpy (RGB)
arr_a = np.array(img_a.convert('RGB'))
arr_b = np.array(img_b.convert('RGB'))
# Calculate dense optical flow (A -> B)
gray_a = cv2.cvtColor(arr_a, cv2.COLOR_RGB2GRAY)
gray_b = cv2.cvtColor(arr_b, cv2.COLOR_RGB2GRAY)
flow = cv2.calcOpticalFlowFarneback(
gray_a, gray_b, None,
pyr_scale=0.5,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
flags=0
)
h, w = flow.shape[:2]
# Create coordinate grids
x_coords = np.tile(np.arange(w), (h, 1)).astype(np.float32)
y_coords = np.tile(np.arange(h), (w, 1)).T.astype(np.float32)
# Warp A forward by t * flow
flow_t = flow * t
map_x_a = x_coords + flow_t[..., 0]
map_y_a = y_coords + flow_t[..., 1]
warped_a = cv2.remap(arr_a, map_x_a, map_y_a, cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
# Warp B backward by (1-t) * flow
flow_back = -flow * (1 - t)
map_x_b = x_coords + flow_back[..., 0]
map_y_b = y_coords + flow_back[..., 1]
warped_b = cv2.remap(arr_b, map_x_b, map_y_b, cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
# Blend the aligned frames
result = cv2.addWeighted(warped_a, 1 - t, warped_b, t, 0)
return Image.fromarray(result)
@staticmethod
def rife_blend(
img_a: Image.Image,
img_b: Image.Image,
t: float,
binary_path: Optional[Path] = None,
auto_download: bool = True
) -> Image.Image:
"""Blend using RIFE AI frame interpolation.
Attempts to use rife-ncnn-vulkan binary, auto-downloading if needed,
then falls back to optical flow if unavailable.
Args:
img_a: First PIL Image (source frame).
img_b: Second PIL Image (target frame).
t: Interpolation factor 0.0 (100% A) to 1.0 (100% B).
binary_path: Optional path to rife-ncnn-vulkan binary.
auto_download: Whether to auto-download RIFE if not found.
Returns:
AI-interpolated blended PIL Image.
"""
# Try NCNN binary first (specified path)
if binary_path and binary_path.exists():
result = ImageBlender._rife_ncnn(img_a, img_b, t, binary_path)
if result is not None:
return result
# Try to find rife-ncnn-vulkan in PATH
ncnn_path = shutil.which('rife-ncnn-vulkan')
if ncnn_path:
result = ImageBlender._rife_ncnn(img_a, img_b, t, Path(ncnn_path))
if result is not None:
return result
# Try cached binary
cached = RifeDownloader.get_cached_binary()
if cached:
result = ImageBlender._rife_ncnn(img_a, img_b, t, cached)
if result is not None:
return result
# Auto-download if enabled
if auto_download:
downloaded = RifeDownloader.ensure_binary()
if downloaded:
result = ImageBlender._rife_ncnn(img_a, img_b, t, downloaded)
if result is not None:
return result
# Fall back to optical flow if RIFE not available
return ImageBlender.optical_flow_blend(img_a, img_b, t)
@staticmethod
def _rife_ncnn(
img_a: Image.Image,
img_b: Image.Image,
t: float,
binary: Path
) -> Optional[Image.Image]:
"""Use rife-ncnn-vulkan binary for interpolation.
Args:
img_a: First PIL Image.
img_b: Second PIL Image.
t: Interpolation timestep (0.0 to 1.0).
binary: Path to rife-ncnn-vulkan binary.
Returns:
Interpolated PIL Image, or None if failed.
"""
try:
with tempfile.TemporaryDirectory() as tmpdir:
tmp = Path(tmpdir)
input_a = tmp / 'a.png'
input_b = tmp / 'b.png'
output_file = tmp / 'out.png'
# Save input images
img_a.convert('RGB').save(input_a)
img_b.convert('RGB').save(input_b)
# Run NCNN binary
# Note: rife-ncnn-vulkan uses -n for timestep count, not direct timestep
# We generate a single frame at position t
cmd = [
str(binary),
'-0', str(input_a),
'-1', str(input_b),
'-o', str(output_file),
]
# Some versions support -s for timestep
# Try with timestep first, fall back to simple interpolation
try:
result = subprocess.run(
cmd + ['-s', str(t)],
check=True,
capture_output=True,
timeout=30
)
except subprocess.CalledProcessError:
# Try without timestep (generates middle frame at t=0.5)
result = subprocess.run(
cmd,
check=True,
capture_output=True,
timeout=30
)
if output_file.exists():
return Image.open(output_file).copy()
except (subprocess.SubprocessError, OSError, IOError):
pass
return None
@staticmethod
def blend_images(
img_a_path: Path,
img_b_path: Path,
factor: float,
output_path: Path,
output_format: str = 'png',
output_quality: int = 95,
webp_method: int = 4,
blend_method: BlendMethod = BlendMethod.ALPHA,
rife_binary_path: Optional[Path] = None
) -> BlendResult:
"""Blend two images together.
Args:
img_a_path: Path to first image (main sequence).
img_b_path: Path to second image (transition sequence).
factor: Blend factor 0.0 (100% A) to 1.0 (100% B).
output_path: Where to save the blended image.
output_format: Output format (png, jpeg, webp).
output_quality: Quality for JPEG output (1-100).
webp_method: WebP compression method (0-6, higher = smaller but slower).
blend_method: The blending method to use (alpha, optical_flow, or rife).
rife_binary_path: Optional path to rife-ncnn-vulkan binary.
Returns:
BlendResult with operation status.
"""
try:
img_a = Image.open(img_a_path)
img_b = Image.open(img_b_path)
# Handle different sizes - resize B to match A
if img_a.size != img_b.size:
img_b = img_b.resize(img_a.size, Image.Resampling.LANCZOS)
# Normalize to RGBA for consistent blending
if img_a.mode != 'RGBA':
img_a = img_a.convert('RGBA')
if img_b.mode != 'RGBA':
img_b = img_b.convert('RGBA')
# Blend images using selected method
if blend_method == BlendMethod.OPTICAL_FLOW:
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
elif blend_method == BlendMethod.RIFE:
blended = ImageBlender.rife_blend(img_a, img_b, factor, rife_binary_path)
else:
# Default: simple alpha blend
blended = Image.blend(img_a, img_b, factor)
# Convert back to RGB if saving to JPEG
if output_format.lower() in ('jpg', 'jpeg'):
blended = blended.convert('RGB')
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Save with appropriate options
save_kwargs = {}
if output_format.lower() in ('jpg', 'jpeg'):
save_kwargs['quality'] = output_quality
elif output_format.lower() == 'webp':
# WebP is always lossless with method setting
save_kwargs['lossless'] = True
save_kwargs['method'] = webp_method
elif output_format.lower() == 'png':
save_kwargs['compress_level'] = 6
blended.save(output_path, **save_kwargs)
return BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=factor,
success=True
)
except Exception as e:
return BlendResult(
output_path=output_path,
source_a=img_a_path,
source_b=img_b_path,
blend_factor=factor,
success=False,
error=str(e)
)
def blend_images_pil(
self,
img_a: Image.Image,
img_b: Image.Image,
factor: float,
output_path: Path,
output_format: str = 'png',
output_quality: int = 95,
webp_method: int = 4,
blend_method: BlendMethod = BlendMethod.ALPHA,
rife_binary_path: Optional[Path] = None
) -> BlendResult:
"""Blend two PIL Image objects together.
Args:
img_a: First PIL Image (main sequence).
img_b: Second PIL Image (transition sequence).
factor: Blend factor 0.0 (100% A) to 1.0 (100% B).
output_path: Where to save the blended image.
output_format: Output format (png, jpeg, webp).
output_quality: Quality for JPEG output (1-100).
webp_method: WebP compression method (0-6).
blend_method: The blending method to use (alpha, optical_flow, or rife).
rife_binary_path: Optional path to rife-ncnn-vulkan binary.
Returns:
BlendResult with operation status.
"""
try:
# Handle different sizes - resize B to match A
if img_a.size != img_b.size:
img_b = img_b.resize(img_a.size, Image.Resampling.LANCZOS)
# Normalize to RGBA for consistent blending
if img_a.mode != 'RGBA':
img_a = img_a.convert('RGBA')
if img_b.mode != 'RGBA':
img_b = img_b.convert('RGBA')
# Blend images using selected method
if blend_method == BlendMethod.OPTICAL_FLOW:
blended = ImageBlender.optical_flow_blend(img_a, img_b, factor)
elif blend_method == BlendMethod.RIFE:
blended = ImageBlender.rife_blend(img_a, img_b, factor, rife_binary_path)
else:
# Default: simple alpha blend
blended = Image.blend(img_a, img_b, factor)
# Convert back to RGB if saving to JPEG
if output_format.lower() in ('jpg', 'jpeg'):
blended = blended.convert('RGB')
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Save with appropriate options
save_kwargs = {}
if output_format.lower() in ('jpg', 'jpeg'):
save_kwargs['quality'] = output_quality
elif output_format.lower() == 'webp':
save_kwargs['lossless'] = True
save_kwargs['method'] = webp_method
elif output_format.lower() == 'png':
save_kwargs['compress_level'] = 6
blended.save(output_path, **save_kwargs)
return BlendResult(
output_path=output_path,
source_a=Path("memory"),
source_b=Path("memory"),
blend_factor=factor,
success=True
)
except Exception as e:
return BlendResult(
output_path=output_path,
source_a=Path("memory"),
source_b=Path("memory"),
blend_factor=factor,
success=False,
error=str(e)
)
class TransitionGenerator:
"""Generates cross-dissolve transitions between folder sequences."""
def __init__(self, settings: TransitionSettings):
"""Initialize the transition generator.
Args:
settings: Transition settings to use.
"""
self.settings = settings
self.blender = ImageBlender()
def get_folder_type(
self,
index: int,
overrides: Optional[dict[Path, FolderType]] = None,
folder: Optional[Path] = None
) -> FolderType:
"""Determine folder type based on position or override.
Args:
index: 0-based position of folder in list.
overrides: Optional dict of folder path to FolderType overrides.
folder: The folder path for checking overrides.
Returns:
FolderType.MAIN for odd positions (1, 3, 5...), TRANSITION for even.
"""
if overrides and folder and folder in overrides:
override = overrides[folder]
if override != FolderType.AUTO:
return override
# Position-based: index 0, 2, 4... are MAIN; 1, 3, 5... are TRANSITION
return FolderType.MAIN if index % 2 == 0 else FolderType.TRANSITION
def identify_transition_boundaries(
self,
folders: list[Path],
files_by_folder: dict[Path, list[str]],
folder_overrides: Optional[dict[Path, FolderType]] = None,
per_transition_settings: Optional[dict[Path, PerTransitionSettings]] = None
) -> list[TransitionSpec]:
"""Identify boundaries where transitions should occur.
Transitions happen at boundaries where folder types change
(MAIN->TRANSITION or TRANSITION->MAIN).
Args:
folders: List of folders in order.
files_by_folder: Dict mapping folders to their file lists.
folder_overrides: Optional folder type overrides.
per_transition_settings: Optional per-transition overlap settings.
Returns:
List of TransitionSpec objects describing each transition.
"""
if len(folders) < 2:
return []
transitions = []
cumulative_idx = 0
folder_start_indices = {}
# Calculate start indices for each folder
for folder in folders:
folder_start_indices[folder] = cumulative_idx
cumulative_idx += len(files_by_folder.get(folder, []))
# Look for transition boundaries (MAIN->TRANSITION and TRANSITION->MAIN)
for i in range(len(folders) - 1):
folder_a = folders[i]
folder_b = folders[i + 1]
type_a = self.get_folder_type(i, folder_overrides, folder_a)
type_b = self.get_folder_type(i + 1, folder_overrides, folder_b)
# Create transition when types differ (MAIN->TRANS or TRANS->MAIN)
if type_a != type_b:
files_a = files_by_folder.get(folder_a, [])
files_b = files_by_folder.get(folder_b, [])
if not files_a or not files_b:
continue
# Get per-transition overlap settings if available
# Use folder_b as the key (the "incoming" folder)
if per_transition_settings and folder_b in per_transition_settings:
pts = per_transition_settings[folder_b]
left_overlap = pts.left_overlap
right_overlap = pts.right_overlap
else:
# Use default of 16 for both
left_overlap = 16
right_overlap = 16
# Cap overlaps by available files
left_overlap = min(left_overlap, len(files_a))
right_overlap = min(right_overlap, len(files_b))
if left_overlap < 1 or right_overlap < 1:
continue
transitions.append(TransitionSpec(
main_folder=folder_a,
trans_folder=folder_b,
main_files=files_a,
trans_files=files_b,
left_overlap=left_overlap,
right_overlap=right_overlap,
main_start_idx=folder_start_indices[folder_a],
trans_start_idx=folder_start_indices[folder_b]
))
return transitions
def generate_asymmetric_blend_frames(
self,
spec: TransitionSpec,
dest: Path,
folder_idx_main: int,
base_file_idx: int
) -> list[BlendResult]:
"""Generate blended frames for an asymmetric transition.
For asymmetric overlap, left_overlap != right_overlap. The blend
creates max(left, right) output frames, with source frames interpolated
to match the longer sequence.
Args:
spec: TransitionSpec describing the transition.
dest: Destination directory for blended frames.
folder_idx_main: Folder index for sequence naming.
base_file_idx: Starting file index for sequence naming.
Returns:
List of BlendResult objects.
"""
results = []
left_overlap = spec.left_overlap
right_overlap = spec.right_overlap
output_count = max(left_overlap, right_overlap)
# Get the frames to use
main_start = len(spec.main_files) - left_overlap
main_frames_paths = [
spec.main_folder / spec.main_files[main_start + i]
for i in range(left_overlap)
]
trans_frames_paths = [
spec.trans_folder / spec.trans_files[i]
for i in range(right_overlap)
]
# Load all frames into memory for interpolation
main_frames = [Image.open(p) for p in main_frames_paths]
trans_frames = [Image.open(p) for p in trans_frames_paths]
# Normalize all frames to RGBA
main_frames = [f.convert('RGBA') if f.mode != 'RGBA' else f for f in main_frames]
trans_frames = [f.convert('RGBA') if f.mode != 'RGBA' else f for f in trans_frames]
# Resize trans frames to match main frame size if needed
target_size = main_frames[0].size
trans_frames = [
f.resize(target_size, Image.Resampling.LANCZOS) if f.size != target_size else f
for f in trans_frames
]
for i in range(output_count):
# Calculate position in each source (0.0 to 1.0)
t = i / (output_count - 1) if output_count > 1 else 0
# Map to source frame indices
main_pos = t * (left_overlap - 1) if left_overlap > 1 else 0
trans_pos = t * (right_overlap - 1) if right_overlap > 1 else 0
# Get source frames (interpolate if fractional)
main_frame = self.blender.interpolate_frame(main_frames, main_pos)
trans_frame = self.blender.interpolate_frame(trans_frames, trans_pos)
# Calculate blend factor with curve
factor = self.blender.calculate_blend_factor(
i, output_count, self.settings.blend_curve
)
# Generate output filename
ext = f".{self.settings.output_format.lower()}"
file_idx = base_file_idx + i
output_name = f"seq{folder_idx_main + 1:02d}_{file_idx:04d}{ext}"
output_path = dest / output_name
result = self.blender.blend_images_pil(
main_frame,
trans_frame,
factor,
output_path,
self.settings.output_format,
self.settings.output_quality,
self.settings.webp_method,
self.settings.blend_method,
self.settings.rife_binary_path
)
results.append(result)
# Close loaded images
for f in main_frames:
f.close()
for f in trans_frames:
f.close()
return results
def generate_transition_frames(
self,
spec: TransitionSpec,
dest: Path,
folder_idx_main: int,
base_file_idx: int
) -> list[BlendResult]:
"""Generate blended frames for a transition.
Uses asymmetric blend if left_overlap != right_overlap.
Args:
spec: TransitionSpec describing the transition.
dest: Destination directory for blended frames.
folder_idx_main: Folder index for sequence naming.
base_file_idx: Starting file index for sequence naming.
Returns:
List of BlendResult objects.
"""
# Use asymmetric blend for all cases (handles symmetric too)
return self.generate_asymmetric_blend_frames(
spec, dest, folder_idx_main, base_file_idx
)

616
core/database.py Normal file
View File

@@ -0,0 +1,616 @@
"""Database management for Video Montage Linker."""
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional
from config import DB_PATH
from .models import (
BlendCurve,
BlendMethod,
FolderType,
TransitionSettings,
PerTransitionSettings,
SymlinkRecord,
SessionRecord,
DatabaseError,
)
class DatabaseManager:
"""Manages SQLite database for tracking symlink sessions and links."""
def __init__(self, db_path: Path = DB_PATH) -> None:
"""Initialize database manager.
Args:
db_path: Path to the SQLite database file.
"""
self.db_path = db_path
self._ensure_db_exists()
def _ensure_db_exists(self) -> None:
"""Create database and tables if they don't exist."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS symlink_sessions (
id INTEGER PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
destination TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS symlinks (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_path TEXT NOT NULL,
link_path TEXT NOT NULL,
original_filename TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS sequence_trim_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
trim_start INTEGER DEFAULT 0,
trim_end INTEGER DEFAULT 0,
folder_type TEXT DEFAULT 'auto',
UNIQUE(session_id, source_folder)
);
CREATE TABLE IF NOT EXISTS transition_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
enabled INTEGER DEFAULT 0,
blend_curve TEXT DEFAULT 'linear',
output_format TEXT DEFAULT 'png',
webp_method INTEGER DEFAULT 4,
output_quality INTEGER DEFAULT 95,
trans_destination TEXT,
UNIQUE(session_id)
);
CREATE TABLE IF NOT EXISTS per_transition_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
trans_folder TEXT NOT NULL,
left_overlap INTEGER DEFAULT 16,
right_overlap INTEGER DEFAULT 16,
UNIQUE(session_id, trans_folder)
);
""")
# Migration: add folder_type column if it doesn't exist
try:
conn.execute("SELECT folder_type FROM sequence_trim_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE sequence_trim_settings ADD COLUMN folder_type TEXT DEFAULT 'auto'")
# Migration: add webp_method column if it doesn't exist
try:
conn.execute("SELECT webp_method FROM transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN webp_method INTEGER DEFAULT 4")
# Migration: add trans_destination column if it doesn't exist
try:
conn.execute("SELECT trans_destination FROM transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN trans_destination TEXT")
# Migration: add blend_method column if it doesn't exist
try:
conn.execute("SELECT blend_method FROM transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN blend_method TEXT DEFAULT 'alpha'")
# Migration: add rife_binary_path column if it doesn't exist
try:
conn.execute("SELECT rife_binary_path FROM transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN rife_binary_path TEXT")
# Migration: remove overlap_frames from transition_settings (now per-transition)
# We'll keep it for backward compatibility but won't use it
def _connect(self) -> sqlite3.Connection:
"""Create a database connection with foreign keys enabled."""
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def create_session(self, destination: str) -> int:
"""Create a new linking session.
Args:
destination: The destination directory path.
Returns:
The ID of the created session.
Raises:
DatabaseError: If session creation fails.
"""
try:
with self._connect() as conn:
cursor = conn.execute(
"INSERT INTO symlink_sessions (destination) VALUES (?)",
(destination,)
)
return cursor.lastrowid
except sqlite3.Error as e:
raise DatabaseError(f"Failed to create session: {e}") from e
def record_symlink(
self,
session_id: int,
source: str,
link: str,
filename: str,
seq: int
) -> int:
"""Record a created symlink.
Args:
session_id: The session this symlink belongs to.
source: Full path to the source file.
link: Full path to the created symlink.
filename: Original filename.
seq: Sequence number in the destination.
Returns:
The ID of the created record.
Raises:
DatabaseError: If recording fails.
"""
try:
with self._connect() as conn:
cursor = conn.execute(
"""INSERT INTO symlinks
(session_id, source_path, link_path, original_filename, sequence_number)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, link, filename, seq)
)
return cursor.lastrowid
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlink: {e}") from e
def get_sessions(self) -> list[SessionRecord]:
"""List all sessions with link counts.
Returns:
List of session records.
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
GROUP BY s.id
ORDER BY s.created_at DESC
""").fetchall()
return [
SessionRecord(
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3]
)
for row in rows
]
def get_symlinks_by_session(self, session_id: int) -> list[SymlinkRecord]:
"""Get all symlinks for a session.
Args:
session_id: The session ID to query.
Returns:
List of symlink records.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT id, session_id, source_path, link_path,
original_filename, sequence_number, created_at
FROM symlinks WHERE session_id = ?
ORDER BY sequence_number""",
(session_id,)
).fetchall()
return [
SymlinkRecord(
id=row[0],
session_id=row[1],
source_path=row[2],
link_path=row[3],
original_filename=row[4],
sequence_number=row[5],
created_at=datetime.fromisoformat(row[6])
)
for row in rows
]
def get_symlinks_by_destination(self, dest: str) -> list[SymlinkRecord]:
"""Get all symlinks for a destination directory.
Args:
dest: The destination directory path.
Returns:
List of symlink records.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT l.id, l.session_id, l.source_path, l.link_path,
l.original_filename, l.sequence_number, l.created_at
FROM symlinks l
JOIN symlink_sessions s ON l.session_id = s.id
WHERE s.destination = ?
ORDER BY l.sequence_number""",
(dest,)
).fetchall()
return [
SymlinkRecord(
id=row[0],
session_id=row[1],
source_path=row[2],
link_path=row[3],
original_filename=row[4],
sequence_number=row[5],
created_at=datetime.fromisoformat(row[6])
)
for row in rows
]
def delete_session(self, session_id: int) -> None:
"""Delete a session and all its symlink records.
Args:
session_id: The session ID to delete.
Raises:
DatabaseError: If deletion fails.
"""
try:
with self._connect() as conn:
conn.execute("DELETE FROM symlinks WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM symlink_sessions WHERE id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete session: {e}") from e
def get_sessions_by_destination(self, dest: str) -> list[SessionRecord]:
"""Get all sessions for a destination directory.
Args:
dest: The destination directory path.
Returns:
List of session records.
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
WHERE s.destination = ?
GROUP BY s.id
ORDER BY s.created_at DESC
""", (dest,)).fetchall()
return [
SessionRecord(
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3]
)
for row in rows
]
def save_trim_settings(
self,
session_id: int,
source_folder: str,
trim_start: int,
trim_end: int,
folder_type: FolderType = FolderType.AUTO
) -> None:
"""Save trim settings for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
trim_start: Number of images to trim from start.
trim_end: Number of images to trim from end.
folder_type: The folder type (auto, main, or transition).
Raises:
DatabaseError: If saving fails.
"""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO sequence_trim_settings
(session_id, source_folder, trim_start, trim_end, folder_type)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, source_folder)
DO UPDATE SET trim_start=excluded.trim_start,
trim_end=excluded.trim_end,
folder_type=excluded.folder_type""",
(session_id, source_folder, trim_start, trim_end, folder_type.value)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save trim settings: {e}") from e
def get_trim_settings(
self,
session_id: int,
source_folder: str
) -> tuple[int, int, FolderType]:
"""Get trim settings for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
Returns:
Tuple of (trim_start, trim_end, folder_type). Returns (0, 0, AUTO) if not found.
"""
with self._connect() as conn:
row = conn.execute(
"""SELECT trim_start, trim_end, folder_type FROM sequence_trim_settings
WHERE session_id = ? AND source_folder = ?""",
(session_id, source_folder)
).fetchone()
if row:
try:
folder_type = FolderType(row[2]) if row[2] else FolderType.AUTO
except ValueError:
folder_type = FolderType.AUTO
return (row[0], row[1], folder_type)
return (0, 0, FolderType.AUTO)
def get_all_trim_settings(self, session_id: int) -> dict[str, tuple[int, int]]:
"""Get all trim settings for a session.
Args:
session_id: The session ID.
Returns:
Dict mapping source folder paths to (trim_start, trim_end) tuples.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, trim_start, trim_end
FROM sequence_trim_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
return {row[0]: (row[1], row[2]) for row in rows}
def save_transition_settings(
self,
session_id: int,
settings: TransitionSettings
) -> None:
"""Save transition settings for a session.
Args:
session_id: The session ID.
settings: TransitionSettings to save.
Raises:
DatabaseError: If saving fails.
"""
try:
trans_dest = str(settings.trans_destination) if settings.trans_destination else None
rife_path = str(settings.rife_binary_path) if settings.rife_binary_path else None
with self._connect() as conn:
conn.execute(
"""INSERT INTO transition_settings
(session_id, enabled, blend_curve, output_format, webp_method, output_quality, trans_destination, blend_method, rife_binary_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id)
DO UPDATE SET enabled=excluded.enabled,
blend_curve=excluded.blend_curve,
output_format=excluded.output_format,
webp_method=excluded.webp_method,
output_quality=excluded.output_quality,
trans_destination=excluded.trans_destination,
blend_method=excluded.blend_method,
rife_binary_path=excluded.rife_binary_path""",
(session_id, 1 if settings.enabled else 0,
settings.blend_curve.value, settings.output_format,
settings.webp_method, settings.output_quality, trans_dest,
settings.blend_method.value, rife_path)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save transition settings: {e}") from e
def get_transition_settings(self, session_id: int) -> Optional[TransitionSettings]:
"""Get transition settings for a session.
Args:
session_id: The session ID.
Returns:
TransitionSettings or None if not found.
"""
with self._connect() as conn:
row = conn.execute(
"""SELECT enabled, blend_curve, output_format, webp_method, output_quality, trans_destination, blend_method, rife_binary_path
FROM transition_settings WHERE session_id = ?""",
(session_id,)
).fetchone()
if row:
trans_dest = Path(row[5]) if row[5] else None
try:
blend_method = BlendMethod(row[6]) if row[6] else BlendMethod.ALPHA
except ValueError:
blend_method = BlendMethod.ALPHA
rife_path = Path(row[7]) if row[7] else None
return TransitionSettings(
enabled=bool(row[0]),
blend_curve=BlendCurve(row[1]),
output_format=row[2],
webp_method=row[3] if row[3] is not None else 4,
output_quality=row[4],
trans_destination=trans_dest,
blend_method=blend_method,
rife_binary_path=rife_path
)
return None
def save_folder_type_override(
self,
session_id: int,
folder: str,
folder_type: FolderType,
trim_start: int = 0,
trim_end: int = 0
) -> None:
"""Save folder type override for a folder in a session.
Args:
session_id: The session ID.
folder: Path to the source folder.
folder_type: The folder type override.
trim_start: Number of images to trim from start.
trim_end: Number of images to trim from end.
Raises:
DatabaseError: If saving fails.
"""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO sequence_trim_settings
(session_id, source_folder, trim_start, trim_end, folder_type)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, source_folder)
DO UPDATE SET trim_start=excluded.trim_start,
trim_end=excluded.trim_end,
folder_type=excluded.folder_type""",
(session_id, folder, trim_start, trim_end, folder_type.value)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save folder type override: {e}") from e
def get_folder_type_overrides(self, session_id: int) -> dict[str, FolderType]:
"""Get all folder type overrides for a session.
Args:
session_id: The session ID.
Returns:
Dict mapping source folder paths to FolderType.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, folder_type
FROM sequence_trim_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
result = {}
for row in rows:
try:
result[row[0]] = FolderType(row[1]) if row[1] else FolderType.AUTO
except ValueError:
result[row[0]] = FolderType.AUTO
return result
def save_per_transition_settings(
self,
session_id: int,
settings: PerTransitionSettings
) -> None:
"""Save per-transition overlap settings.
Args:
session_id: The session ID.
settings: PerTransitionSettings to save.
Raises:
DatabaseError: If saving fails.
"""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO per_transition_settings
(session_id, trans_folder, left_overlap, right_overlap)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder)
DO UPDATE SET left_overlap=excluded.left_overlap,
right_overlap=excluded.right_overlap""",
(session_id, str(settings.trans_folder),
settings.left_overlap, settings.right_overlap)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save per-transition settings: {e}") from e
def get_per_transition_settings(
self,
session_id: int,
trans_folder: str
) -> Optional[PerTransitionSettings]:
"""Get per-transition settings for a specific transition folder.
Args:
session_id: The session ID.
trans_folder: Path to the transition folder.
Returns:
PerTransitionSettings or None if not found.
"""
with self._connect() as conn:
row = conn.execute(
"""SELECT left_overlap, right_overlap FROM per_transition_settings
WHERE session_id = ? AND trans_folder = ?""",
(session_id, trans_folder)
).fetchone()
if row:
return PerTransitionSettings(
trans_folder=Path(trans_folder),
left_overlap=row[0],
right_overlap=row[1]
)
return None
def get_all_per_transition_settings(
self,
session_id: int
) -> dict[str, PerTransitionSettings]:
"""Get all per-transition settings for a session.
Args:
session_id: The session ID.
Returns:
Dict mapping transition folder paths to PerTransitionSettings.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT trans_folder, left_overlap, right_overlap
FROM per_transition_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
return {
row[0]: PerTransitionSettings(
trans_folder=Path(row[0]),
left_overlap=row[1],
right_overlap=row[2]
)
for row in rows
}

205
core/manager.py Normal file
View File

@@ -0,0 +1,205 @@
"""Symlink management for Video Montage Linker."""
import os
import re
from pathlib import Path
from typing import Optional
from config import SUPPORTED_EXTENSIONS
from .models import LinkResult, CleanupError, SourceNotFoundError, DestinationError
from .database import DatabaseManager
class SymlinkManager:
"""Manages symlink creation and cleanup operations."""
def __init__(self, db: Optional[DatabaseManager] = None) -> None:
"""Initialize the symlink manager.
Args:
db: Optional database manager for tracking operations.
"""
self.db = db
@staticmethod
def get_supported_files(directories: list[Path]) -> list[tuple[Path, str]]:
"""Get all supported image files from multiple directories.
Files are returned sorted by directory order (as provided), then
alphabetically by filename within each directory.
Args:
directories: List of source directories to scan.
Returns:
List of (directory, filename) tuples.
"""
files: list[tuple[Path, str]] = []
for directory in directories:
if not directory.is_dir():
continue
dir_files = []
for item in directory.iterdir():
if item.is_file() and item.suffix.lower() in SUPPORTED_EXTENSIONS:
dir_files.append((directory, item.name))
# Sort files within this directory alphabetically
dir_files.sort(key=lambda x: x[1].lower())
files.extend(dir_files)
return files
@staticmethod
def validate_paths(sources: list[Path], dest: Path) -> None:
"""Validate source and destination paths.
Args:
sources: List of source directories.
dest: Destination directory.
Raises:
SourceNotFoundError: If any source directory doesn't exist.
DestinationError: If destination cannot be created or accessed.
"""
if not sources:
raise SourceNotFoundError("No source directories specified")
for source in sources:
if not source.exists():
raise SourceNotFoundError(f"Source directory not found: {source}")
if not source.is_dir():
raise SourceNotFoundError(f"Source is not a directory: {source}")
try:
dest.mkdir(parents=True, exist_ok=True)
except OSError as e:
raise DestinationError(f"Cannot create destination directory: {e}") from e
if not dest.is_dir():
raise DestinationError(f"Destination is not a directory: {dest}")
@staticmethod
def cleanup_old_links(directory: Path) -> int:
"""Remove existing seq* symlinks from a directory.
Handles both old format (seq_0000) and new format (seq01_0000).
Also removes blended image files (not just symlinks) created by
cross-dissolve transitions.
Args:
directory: Directory to clean up.
Returns:
Number of files removed.
Raises:
CleanupError: If cleanup fails.
"""
removed = 0
seq_pattern = re.compile(r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE)
try:
for item in directory.iterdir():
# Match both old (seq_NNNN) and new (seqNN_NNNN) formats
if item.name.startswith("seq"):
if item.is_symlink():
item.unlink()
removed += 1
elif item.is_file() and seq_pattern.match(item.name):
# Also remove blended image files
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to clean up old links: {e}") from e
return removed
def create_sequence_links(
self,
sources: list[Path],
dest: Path,
files: list[tuple],
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
) -> tuple[list[LinkResult], Optional[int]]:
"""Create sequenced symlinks from source files to destination.
Args:
sources: List of source directories (for validation).
dest: Destination directory.
files: List of tuples. Can be:
- (source_dir, filename) for CLI mode (uses global sequence)
- (source_dir, filename, folder_idx, file_idx) for GUI mode
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
Returns:
Tuple of (list of LinkResult objects, session_id or None).
"""
self.validate_paths(sources, dest)
self.cleanup_old_links(dest)
session_id = None
if self.db:
session_id = self.db.create_session(str(dest))
# Save trim settings if provided
if trim_settings and session_id:
for folder, (trim_start, trim_end) in trim_settings.items():
if trim_start > 0 or trim_end > 0:
self.db.save_trim_settings(
session_id, str(folder), trim_start, trim_end
)
results: list[LinkResult] = []
# Check if we have folder indices (GUI mode) or not (CLI mode)
use_folder_sequences = len(files) > 0 and len(files[0]) >= 4
# For CLI mode without folder indices, calculate them
if not use_folder_sequences:
folder_to_index = {folder: i for i, folder in enumerate(sources)}
folder_file_counts: dict[Path, int] = {}
expanded_files = []
for source_dir, filename in files:
folder_idx = folder_to_index.get(source_dir, 0)
file_idx = folder_file_counts.get(source_dir, 0)
folder_file_counts[source_dir] = file_idx + 1
expanded_files.append((source_dir, filename, folder_idx, file_idx))
files = expanded_files
for i, file_data in enumerate(files):
source_dir, filename, folder_idx, file_idx = file_data
source_path = source_dir / filename
ext = source_path.suffix
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
link_path = dest / link_name
# Calculate relative path from destination to source
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
try:
link_path.symlink_to(rel_source)
if self.db and session_id:
self.db.record_symlink(
session_id=session_id,
source=str(source_path.resolve()),
link=str(link_path),
filename=filename,
seq=i
)
results.append(LinkResult(
source_path=source_path,
link_path=link_path,
sequence_number=i,
success=True
))
except OSError as e:
results.append(LinkResult(
source_path=source_path,
link_path=link_path,
sequence_number=i,
success=False,
error=str(e)
))
return results, session_id

136
core/models.py Normal file
View File

@@ -0,0 +1,136 @@
"""Data models, enums, and exceptions for Video Montage Linker."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Optional
# --- Enums ---
class BlendCurve(Enum):
"""Blend curve types for cross-dissolve transitions."""
LINEAR = 'linear'
EASE_IN = 'ease_in'
EASE_OUT = 'ease_out'
EASE_IN_OUT = 'ease_in_out'
class BlendMethod(Enum):
"""Blend method types for transitions."""
ALPHA = 'alpha' # Simple cross-dissolve (PIL.Image.blend)
OPTICAL_FLOW = 'optical' # OpenCV Farneback optical flow
RIFE = 'rife' # AI frame interpolation (NCNN binary or PyTorch)
class FolderType(Enum):
"""Folder type for transition detection."""
AUTO = 'auto'
MAIN = 'main'
TRANSITION = 'transition'
# --- Data Classes ---
@dataclass
class TransitionSettings:
"""Settings for cross-dissolve transitions."""
enabled: bool = False
blend_curve: BlendCurve = BlendCurve.LINEAR
output_format: str = 'png'
webp_method: int = 4 # 0-6, used when format is webp (compression effort)
output_quality: int = 95 # used for jpeg only
trans_destination: Optional[Path] = None # separate destination for transition output
blend_method: BlendMethod = BlendMethod.ALPHA # blending method
rife_binary_path: Optional[Path] = None # path to rife-ncnn-vulkan binary
@dataclass
class PerTransitionSettings:
"""Per-transition overlap settings for asymmetric cross-dissolves."""
trans_folder: Path
left_overlap: int = 16 # frames from main folder end
right_overlap: int = 16 # frames from trans folder start
@dataclass
class BlendResult:
"""Result of an image blend operation."""
output_path: Path
source_a: Path
source_b: Path
blend_factor: float
success: bool
error: Optional[str] = None
@dataclass
class TransitionSpec:
"""Specification for a transition boundary between two folders."""
main_folder: Path
trans_folder: Path
main_files: list[str]
trans_files: list[str]
left_overlap: int # asymmetric: frames from main folder end
right_overlap: int # asymmetric: frames from trans folder start
# Indices into the overall file list
main_start_idx: int
trans_start_idx: int
@dataclass
class LinkResult:
"""Result of a symlink creation operation."""
source_path: Path
link_path: Path
sequence_number: int
success: bool
error: Optional[str] = None
@dataclass
class SymlinkRecord:
"""Database record of a created symlink."""
id: int
session_id: int
source_path: str
link_path: str
original_filename: str
sequence_number: int
created_at: datetime
@dataclass
class SessionRecord:
"""Database record of a symlink session."""
id: int
created_at: datetime
destination: str
link_count: int = 0
# --- Exceptions ---
class SymlinkError(Exception):
"""Base exception for symlink operations."""
class PathValidationError(SymlinkError):
"""Error validating file paths."""
class SourceNotFoundError(PathValidationError):
"""Source directory does not exist."""
class DestinationError(PathValidationError):
"""Error with destination directory."""
class CleanupError(SymlinkError):
"""Error during cleanup of existing symlinks."""
class DatabaseError(SymlinkError):
"""Error with database operations."""

1936
symlink.py

File diff suppressed because it is too large Load Diff

9
ui/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
"""UI modules for Video Montage Linker."""
from .widgets import TrimSlider
from .main_window import SequenceLinkerUI
__all__ = [
'TrimSlider',
'SequenceLinkerUI',
]

291
ui/widgets.py Normal file
View File

@@ -0,0 +1,291 @@
"""Custom widgets for Video Montage Linker UI."""
from typing import Optional
from PyQt6.QtCore import Qt, pyqtSignal, QRect
from PyQt6.QtGui import QPainter, QColor, QBrush, QPen, QMouseEvent
from PyQt6.QtWidgets import QWidget
class TrimSlider(QWidget):
"""A slider widget with two draggable handles for trimming sequences.
Allows setting in/out points for a sequence by dragging left and right handles.
Gray areas indicate trimmed regions, colored area indicates included images.
"""
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the trim slider.
Args:
parent: Parent widget.
"""
super().__init__(parent)
self._total = 0
self._trim_start = 0
self._trim_end = 0
self._current_pos = 0
self._dragging: Optional[str] = None # 'left', 'right', or None
self._handle_width = 10
self._track_height = 20
self._enabled = True
self.setMinimumHeight(40)
self.setMinimumWidth(100)
self.setCursor(Qt.CursorShape.ArrowCursor)
self.setMouseTracking(True)
def setRange(self, total: int) -> None:
"""Set the total number of items in the sequence.
Args:
total: Total number of items.
"""
self._total = max(0, total)
# Clamp trim values to valid range
self._trim_start = min(self._trim_start, max(0, self._total - 1))
self._trim_end = min(self._trim_end, max(0, self._total - 1 - self._trim_start))
self.update()
def setTrimStart(self, value: int) -> None:
"""Set the trim start value.
Args:
value: Number of items to trim from start.
"""
max_start = max(0, self._total - 1 - self._trim_end)
self._trim_start = max(0, min(value, max_start))
self.update()
def setTrimEnd(self, value: int) -> None:
"""Set the trim end value.
Args:
value: Number of items to trim from end.
"""
max_end = max(0, self._total - 1 - self._trim_start)
self._trim_end = max(0, min(value, max_end))
self.update()
def setCurrentPosition(self, pos: int) -> None:
"""Set the current position indicator.
Args:
pos: Current position index.
"""
self._current_pos = max(0, min(pos, self._total - 1)) if self._total > 0 else 0
self.update()
def trimStart(self) -> int:
"""Get the trim start value."""
return self._trim_start
def trimEnd(self) -> int:
"""Get the trim end value."""
return self._trim_end
def total(self) -> int:
"""Get the total number of items."""
return self._total
def includedRange(self) -> tuple[int, int]:
"""Get the range of included items (after trimming).
Returns:
Tuple of (first_included_index, last_included_index).
Returns (-1, -1) if no items are included.
"""
if self._total == 0:
return (-1, -1)
first = self._trim_start
last = self._total - 1 - self._trim_end
if first > last:
return (-1, -1)
return (first, last)
def setEnabled(self, enabled: bool) -> None:
"""Enable or disable the widget."""
self._enabled = enabled
self.update()
def _track_rect(self) -> QRect:
"""Get the rectangle for the slider track."""
margin = self._handle_width
return QRect(
margin,
(self.height() - self._track_height) // 2,
self.width() - 2 * margin,
self._track_height
)
def _value_to_x(self, value: int) -> int:
"""Convert a value to an x coordinate."""
track = self._track_rect()
if self._total <= 1:
return track.left()
ratio = value / (self._total - 1)
return int(track.left() + ratio * track.width())
def _x_to_value(self, x: int) -> int:
"""Convert an x coordinate to a value."""
track = self._track_rect()
if track.width() == 0 or self._total <= 1:
return 0
ratio = (x - track.left()) / track.width()
ratio = max(0.0, min(1.0, ratio))
return int(round(ratio * (self._total - 1)))
def _left_handle_rect(self) -> QRect:
"""Get the rectangle for the left (trim start) handle."""
x = self._value_to_x(self._trim_start)
return QRect(
x - self._handle_width // 2,
(self.height() - self._track_height - 10) // 2,
self._handle_width,
self._track_height + 10
)
def _right_handle_rect(self) -> QRect:
"""Get the rectangle for the right (trim end) handle."""
x = self._value_to_x(self._total - 1 - self._trim_end) if self._total > 0 else 0
return QRect(
x - self._handle_width // 2,
(self.height() - self._track_height - 10) // 2,
self._handle_width,
self._track_height + 10
)
def paintEvent(self, event) -> None:
"""Paint the trim slider."""
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
track = self._track_rect()
# Colors
bg_color = QColor(60, 60, 60)
trimmed_color = QColor(80, 80, 80)
included_color = QColor(52, 152, 219) if self._enabled else QColor(100, 100, 100)
handle_color = QColor(200, 200, 200) if self._enabled else QColor(120, 120, 120)
position_color = QColor(255, 255, 255)
# Draw background track
painter.fillRect(track, bg_color)
if self._total > 0:
# Draw trimmed regions (darker)
left_trim_x = self._value_to_x(self._trim_start)
right_trim_x = self._value_to_x(self._total - 1 - self._trim_end)
# Left trimmed region
if self._trim_start > 0:
left_rect = QRect(track.left(), track.top(),
left_trim_x - track.left(), track.height())
painter.fillRect(left_rect, trimmed_color)
# Right trimmed region
if self._trim_end > 0:
right_rect = QRect(right_trim_x, track.top(),
track.right() - right_trim_x, track.height())
painter.fillRect(right_rect, trimmed_color)
# Draw included region
if left_trim_x < right_trim_x:
included_rect = QRect(left_trim_x, track.top(),
right_trim_x - left_trim_x, track.height())
painter.fillRect(included_rect, included_color)
# Draw current position indicator
if self._trim_start <= self._current_pos <= (self._total - 1 - self._trim_end):
pos_x = self._value_to_x(self._current_pos)
painter.setPen(QPen(position_color, 2))
painter.drawLine(pos_x, track.top() - 2, pos_x, track.bottom() + 2)
# Draw handles
painter.setBrush(QBrush(handle_color))
painter.setPen(QPen(Qt.GlobalColor.black, 1))
# Left handle
left_handle = self._left_handle_rect()
painter.drawRect(left_handle)
# Right handle
right_handle = self._right_handle_rect()
painter.drawRect(right_handle)
painter.end()
def mousePressEvent(self, event: QMouseEvent) -> None:
"""Handle mouse press to start dragging handles."""
if not self._enabled or self._total == 0:
return
pos = event.pos()
# Check if clicking on handles (check right first since it may overlap)
right_rect = self._right_handle_rect()
left_rect = self._left_handle_rect()
# Expand hit area slightly for easier grabbing
expand = 5
left_expanded = left_rect.adjusted(-expand, -expand, expand, expand)
right_expanded = right_rect.adjusted(-expand, -expand, expand, expand)
if right_expanded.contains(pos):
self._dragging = 'right'
elif left_expanded.contains(pos):
self._dragging = 'left'
else:
self._dragging = None
def mouseMoveEvent(self, event: QMouseEvent) -> None:
"""Handle mouse move to drag handles."""
if not self._enabled:
return
pos = event.pos()
# Update cursor based on position
if self._dragging:
self.setCursor(Qt.CursorShape.SizeHorCursor)
else:
left_rect = self._left_handle_rect()
right_rect = self._right_handle_rect()
expand = 5
left_expanded = left_rect.adjusted(-expand, -expand, expand, expand)
right_expanded = right_rect.adjusted(-expand, -expand, expand, expand)
if left_expanded.contains(pos) or right_expanded.contains(pos):
self.setCursor(Qt.CursorShape.SizeHorCursor)
else:
self.setCursor(Qt.CursorShape.ArrowCursor)
if self._dragging and self._total > 0:
value = self._x_to_value(pos.x())
if self._dragging == 'left':
# Left handle: set trim_start, clamped to not exceed right
max_start = self._total - 1 - self._trim_end
new_start = max(0, min(value, max_start))
if new_start != self._trim_start:
self._trim_start = new_start
self.update()
self.trimChanged.emit(self._trim_start, self._trim_end, 'left')
elif self._dragging == 'right':
# Right handle: set trim_end based on position
# value is the index position, trim_end is count from end
max_val = self._total - 1 - self._trim_start
clamped_value = max(self._trim_start, min(value, self._total - 1))
new_end = self._total - 1 - clamped_value
if new_end != self._trim_end:
self._trim_end = max(0, new_end)
self.update()
self.trimChanged.emit(self._trim_start, self._trim_end, 'right')
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
"""Handle mouse release to stop dragging."""
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)