Compare commits

..

4 Commits

Author SHA1 Message Date
3c4da8f23b Update README with comprehensive feature documentation
- Document cross-dissolve transitions and blend methods
- Add RIFE auto-download instructions
- Document per-folder trim and per-transition overlap
- Add file structure diagram
- Update installation requirements
- Expand supported formats list

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 18:51:50 +01:00
9c0d64f908 Restructure into multi-file architecture
Split monolithic symlink.py into modular components:
- config.py: Constants and configuration
- core/: Models, database, blender, manager
- ui/: Main window and widgets

New features included:
- Cross-dissolve transitions with multiple blend methods
- Alpha blend, Optical Flow, and RIFE (AI) interpolation
- Per-folder trim settings with start/end frame control
- Per-transition asymmetric overlap settings
- Folder type overrides (Main/Transition)
- Dual destination folders (sequence + transitions)
- WebP lossless output with compression method setting
- Video and image sequence preview with zoom/pan
- Session resume from destination folder

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 18:49:51 +01:00
41e3acb2cb Add alternating row colors to source folder list
Improves readability of the source folder panel.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 18:49:39 +01:00
3dd46a0aa1 cool 2026-02-03 16:21:56 +01:00
13 changed files with 547 additions and 6984 deletions

60
.gitignore vendored
View File

@@ -1,64 +1,4 @@
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so
# Virtual environments
venv/
venv-rife/
.venv/
env/
# Environment files
.env
.env.local
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Database
*.db
*.sqlite
*.sqlite3
# Downloads and cache
*.pkl
*.pt
*.pth
*.onnx
downloads/
cache/
.cache/
# RIFE binaries and models
rife-ncnn-vulkan*/
*.zip
# Output directories
output/
outputs/
temp/
tmp/
# Logs
*.log
logs/
# OS files
.DS_Store
Thumbs.db
# Build artifacts
dist/
build/
*.egg-info/
# Git mirror scripts
gitea-push-mirror-setup

View File

@@ -13,20 +13,18 @@ A PyQt6 application for creating sequenced symlinks from image folders with adva
- Per-folder trim settings (exclude frames from start/end)
### Cross-Dissolve Transitions
Smooth blending between folder boundaries with four blend methods:
Smooth blending between folder boundaries with three blend methods:
| Method | Description | Quality | Speed |
|--------|-------------|---------|-------|
| **Cross-Dissolve** | Simple alpha blend | Good | Fastest |
| **Optical Flow** | Motion-compensated blend using OpenCV Farneback | Better | Medium |
| **RIFE (ncnn)** | Neural network interpolation via rife-ncnn-vulkan | Best | Fast (GPU) |
| **RIFE (Practical)** | PyTorch-based Practical-RIFE (v4.25/v4.26) | Best | Medium (GPU) |
| **RIFE (AI)** | Neural network frame interpolation | Best | Fast (GPU) |
- **Asymmetric overlap**: Set different frame counts for each side of a transition
- **Blend curves**: Linear, Ease In, Ease Out, Ease In/Out
- **Output formats**: PNG, JPEG (with quality), WebP (lossless with method setting)
- **RIFE auto-download**: Automatically downloads rife-ncnn-vulkan binary
- **Practical-RIFE models**: Auto-downloads from Google Drive on first use
### Preview
- **Video Preview**: Play video files from source folders
@@ -56,24 +54,11 @@ Smooth blending between folder boundaries with four blend methods:
pip install PyQt6 Pillow numpy opencv-python
```
**Note:** Practical-RIFE creates its own isolated venv with PyTorch. The `gdown` package is installed automatically for downloading models from Google Drive.
### RIFE ncnn (Optional)
For AI-powered frame interpolation using Vulkan GPU acceleration:
- Select **RIFE (ncnn)** as the blend method
- Click **Download** to auto-fetch [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan)
### RIFE (Optional)
For AI-powered frame interpolation, the app can auto-download [rife-ncnn-vulkan](https://github.com/nihui/rife-ncnn-vulkan) or you can install it manually:
- Select **RIFE (AI)** as the blend method
- Click **Download** to fetch the latest release
- Or specify a custom binary path
- Models: rife-v4.6, rife-v4.15-lite, etc.
### Practical-RIFE (Optional)
For PyTorch-based frame interpolation with latest models:
- Select **RIFE (Practical)** as the blend method
- Click **Setup PyTorch** to create an isolated venv with PyTorch (~2GB)
- Models auto-download from Google Drive on first use
- Available models: v4.26, v4.25, v4.22, v4.20, v4.18, v4.15
- Optional ensemble mode for higher quality (slower)
The venv is stored at `~/.cache/video-montage-linker/venv-rife/`
## Usage
@@ -113,8 +98,7 @@ video-montage-linker/
├── core/
│ ├── models.py # Enums, dataclasses
│ ├── database.py # SQLite session management
│ ├── blender.py # Image blending, RIFE downloader, Practical-RIFE env
│ ├── rife_worker.py # Practical-RIFE inference (runs in isolated venv)
│ ├── blender.py # Image blending, RIFE downloader
│ └── manager.py # Symlink operations
└── ui/
├── widgets.py # TrimSlider, custom widgets

View File

@@ -4,12 +4,8 @@ from .models import (
BlendCurve,
BlendMethod,
FolderType,
DirectInterpolationMethod,
TransitionSettings,
PerTransitionSettings,
DirectTransitionSettings,
VideoPreset,
VIDEO_PRESETS,
BlendResult,
TransitionSpec,
LinkResult,
@@ -23,20 +19,15 @@ from .models import (
DatabaseError,
)
from .database import DatabaseManager
from .blender import ImageBlender, TransitionGenerator, RifeDownloader, PracticalRifeEnv, FilmEnv, OPTICAL_FLOW_PRESETS
from .blender import ImageBlender, TransitionGenerator, RifeDownloader
from .manager import SymlinkManager
from .video import encode_image_sequence, encode_from_file_list, find_ffmpeg
__all__ = [
'BlendCurve',
'BlendMethod',
'FolderType',
'DirectInterpolationMethod',
'TransitionSettings',
'PerTransitionSettings',
'DirectTransitionSettings',
'VideoPreset',
'VIDEO_PRESETS',
'BlendResult',
'TransitionSpec',
'LinkResult',
@@ -52,11 +43,5 @@ __all__ = [
'ImageBlender',
'TransitionGenerator',
'RifeDownloader',
'PracticalRifeEnv',
'FilmEnv',
'SymlinkManager',
'OPTICAL_FLOW_PRESETS',
'encode_image_sequence',
'encode_from_file_list',
'find_ffmpeg',
]

File diff suppressed because it is too large Load Diff

View File

@@ -39,8 +39,7 @@ class DatabaseManager:
CREATE TABLE IF NOT EXISTS symlink_sessions (
id INTEGER PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
destination TEXT NOT NULL,
name TEXT DEFAULT NULL
destination TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS symlinks (
@@ -83,24 +82,6 @@ class DatabaseManager:
right_overlap INTEGER DEFAULT 16,
UNIQUE(session_id, trans_folder)
);
CREATE TABLE IF NOT EXISTS removed_files (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
filename TEXT NOT NULL,
UNIQUE(session_id, source_folder, filename)
);
CREATE TABLE IF NOT EXISTS direct_transition_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
after_folder TEXT NOT NULL,
frame_count INTEGER DEFAULT 16,
method TEXT DEFAULT 'film',
enabled INTEGER DEFAULT 1,
UNIQUE(session_id, after_folder)
);
""")
# Migration: add folder_type column if it doesn't exist
@@ -133,168 +114,20 @@ class DatabaseManager:
except sqlite3.OperationalError:
conn.execute("ALTER TABLE transition_settings ADD COLUMN rife_binary_path TEXT")
# Migration: add folder_order column if it doesn't exist
try:
conn.execute("SELECT folder_order FROM sequence_trim_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE sequence_trim_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
# Migration: add name column to symlink_sessions if it doesn't exist
try:
conn.execute("SELECT name FROM symlink_sessions LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN name TEXT DEFAULT NULL")
# Migration: widen UNIQUE constraints to allow duplicate folder paths per session.
# sequence_trim_settings: UNIQUE(session_id, source_folder) → UNIQUE(session_id, folder_order)
self._migrate_unique_constraint(
conn, 'sequence_trim_settings',
"""CREATE TABLE sequence_trim_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
trim_start INTEGER DEFAULT 0,
trim_end INTEGER DEFAULT 0,
folder_type TEXT DEFAULT 'auto',
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, folder_order)
)""",
'session_id, source_folder, trim_start, trim_end, folder_type, folder_order',
)
# per_transition_settings: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM per_transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE per_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'per_transition_settings',
"""CREATE TABLE per_transition_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
trans_folder TEXT NOT NULL,
left_overlap INTEGER DEFAULT 16,
right_overlap INTEGER DEFAULT 16,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, trans_folder, folder_order)
)""",
'session_id, trans_folder, left_overlap, right_overlap, folder_order',
)
# removed_files: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM removed_files LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE removed_files ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'removed_files',
"""CREATE TABLE removed_files_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
filename TEXT NOT NULL,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, source_folder, filename, folder_order)
)""",
'session_id, source_folder, filename, folder_order',
)
# direct_transition_settings: add folder_order, widen UNIQUE
try:
conn.execute("SELECT folder_order FROM direct_transition_settings LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE direct_transition_settings ADD COLUMN folder_order INTEGER DEFAULT 0")
self._migrate_unique_constraint(
conn, 'direct_transition_settings',
"""CREATE TABLE direct_transition_settings_new (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
after_folder TEXT NOT NULL,
frame_count INTEGER DEFAULT 16,
method TEXT DEFAULT 'film',
enabled INTEGER DEFAULT 1,
folder_order INTEGER DEFAULT 0,
UNIQUE(session_id, after_folder, folder_order)
)""",
'session_id, after_folder, frame_count, method, enabled, folder_order',
)
# Migration: add locked column to symlink_sessions
try:
conn.execute("SELECT locked FROM symlink_sessions LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE symlink_sessions ADD COLUMN locked INTEGER DEFAULT 0")
# Migration: remove overlap_frames from transition_settings (now per-transition)
# We'll keep it for backward compatibility but won't use it
@staticmethod
def _migrate_unique_constraint(
conn: sqlite3.Connection,
table: str,
create_new_sql: str,
columns: str,
) -> None:
"""Recreate a table with a new UNIQUE constraint if needed.
Tests whether duplicate folder_order=0 entries can be inserted.
If an IntegrityError fires, the old constraint is too narrow and
the table must be recreated.
"""
new_table = f"{table}_new"
try:
# Test: can we insert two rows with same session+folder but different folder_order?
# If the old UNIQUE is still (session_id, source_folder) this will fail.
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 1)")
conn.execute(f"INSERT INTO {table} (session_id, {columns.split(',')[1].strip()}, folder_order) VALUES (-999, '__test__', 2)")
# Clean up test rows
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
# If we got here, the constraint already allows duplicates — no migration needed
return
except sqlite3.IntegrityError:
# Old constraint is too narrow — need to recreate
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
except sqlite3.OperationalError:
# Column might not exist yet or other issue — try migration anyway
conn.execute(f"DELETE FROM {table} WHERE session_id = -999")
try:
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
conn.execute(create_new_sql)
conn.execute(f"INSERT INTO {new_table} ({columns}) SELECT {columns} FROM {table}")
conn.execute(f"DROP TABLE {table}")
conn.execute(f"ALTER TABLE {new_table} RENAME TO {table}")
except (sqlite3.OperationalError, sqlite3.IntegrityError):
# Clean up failed migration attempt
try:
conn.execute(f"DROP TABLE IF EXISTS {new_table}")
except sqlite3.OperationalError:
pass
def clear_session_data(self, session_id: int) -> None:
"""Delete all data for a session (symlinks, settings, etc.) but keep the session row."""
try:
with self._connect() as conn:
for table in (
'symlinks', 'sequence_trim_settings', 'transition_settings',
'per_transition_settings', 'removed_files', 'direct_transition_settings',
):
conn.execute(f"DELETE FROM {table} WHERE session_id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to clear session data: {e}") from e
def _connect(self) -> sqlite3.Connection:
"""Create a database connection with foreign keys enabled."""
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def create_session(self, destination: str, name: Optional[str] = None) -> int:
def create_session(self, destination: str) -> int:
"""Create a new linking session.
Args:
destination: The destination directory path.
name: Optional display name (e.g. "autosave").
Returns:
The ID of the created session.
@@ -305,8 +138,8 @@ class DatabaseManager:
try:
with self._connect() as conn:
cursor = conn.execute(
"INSERT INTO symlink_sessions (destination, name) VALUES (?, ?)",
(destination, name)
"INSERT INTO symlink_sessions (destination) VALUES (?)",
(destination,)
)
return cursor.lastrowid
except sqlite3.Error as e:
@@ -347,31 +180,6 @@ class DatabaseManager:
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlink: {e}") from e
def record_symlinks_batch(
self,
session_id: int,
records: list[tuple[str, str, str, int]],
) -> None:
"""Record multiple symlinks in a single transaction.
Args:
session_id: The session these symlinks belong to.
records: List of (source, link, filename, seq) tuples.
Raises:
DatabaseError: If recording fails.
"""
try:
with self._connect() as conn:
conn.executemany(
"""INSERT INTO symlinks
(session_id, source_path, link_path, original_filename, sequence_number)
VALUES (?, ?, ?, ?, ?)""",
[(session_id, src, lnk, fname, seq) for src, lnk, fname, seq in records]
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlinks: {e}") from e
def get_sessions(self) -> list[SessionRecord]:
"""List all sessions with link counts.
@@ -380,8 +188,7 @@ class DatabaseManager:
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
s.name, COALESCE(s.locked, 0)
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
GROUP BY s.id
@@ -393,9 +200,7 @@ class DatabaseManager:
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3],
name=row[4],
locked=bool(row[5])
link_count=row[3]
)
for row in rows
]
@@ -465,7 +270,7 @@ class DatabaseManager:
]
def delete_session(self, session_id: int) -> None:
"""Delete a session and all its related data (CASCADE handles child tables).
"""Delete a session and all its symlink records.
Args:
session_id: The session ID to delete.
@@ -475,56 +280,11 @@ class DatabaseManager:
"""
try:
with self._connect() as conn:
conn.execute("DELETE FROM symlinks WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM symlink_sessions WHERE id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete session: {e}") from e
def delete_sessions(self, session_ids: list[int]) -> None:
"""Delete multiple sessions in a single transaction.
Locked sessions are silently skipped.
Args:
session_ids: List of session IDs to delete.
Raises:
DatabaseError: If deletion fails.
"""
if not session_ids:
return
try:
with self._connect() as conn:
placeholders = ','.join('?' for _ in session_ids)
conn.execute(
f"DELETE FROM symlink_sessions WHERE id IN ({placeholders}) AND COALESCE(locked, 0) = 0",
session_ids
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete sessions: {e}") from e
def toggle_session_locked(self, session_id: int) -> bool:
"""Toggle the locked state of a session.
Returns:
The new locked state.
"""
try:
with self._connect() as conn:
row = conn.execute(
"SELECT COALESCE(locked, 0) FROM symlink_sessions WHERE id = ?",
(session_id,)
).fetchone()
if row is None:
raise DatabaseError(f"Session {session_id} not found")
new_val = 0 if row[0] else 1
conn.execute(
"UPDATE symlink_sessions SET locked = ? WHERE id = ?",
(new_val, session_id)
)
return bool(new_val)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to toggle session lock: {e}") from e
def get_sessions_by_destination(self, dest: str) -> list[SessionRecord]:
"""Get all sessions for a destination directory.
@@ -536,8 +296,7 @@ class DatabaseManager:
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count,
s.name, COALESCE(s.locked, 0)
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
WHERE s.destination = ?
@@ -550,9 +309,7 @@ class DatabaseManager:
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3],
name=row[4],
locked=bool(row[5])
link_count=row[3]
)
for row in rows
]
@@ -563,8 +320,7 @@ class DatabaseManager:
source_folder: str,
trim_start: int,
trim_end: int,
folder_type: FolderType = FolderType.AUTO,
folder_order: int = 0,
folder_type: FolderType = FolderType.AUTO
) -> None:
"""Save trim settings for a folder in a session.
@@ -574,7 +330,6 @@ class DatabaseManager:
trim_start: Number of images to trim from start.
trim_end: Number of images to trim from end.
folder_type: The folder type (auto, main, or transition).
folder_order: Position of this folder in source_folders list.
Raises:
DatabaseError: If saving fails.
@@ -583,14 +338,13 @@ class DatabaseManager:
with self._connect() as conn:
conn.execute(
"""INSERT INTO sequence_trim_settings
(session_id, source_folder, trim_start, trim_end, folder_type, folder_order)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, folder_order)
DO UPDATE SET source_folder=excluded.source_folder,
trim_start=excluded.trim_start,
(session_id, source_folder, trim_start, trim_end, folder_type)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, source_folder)
DO UPDATE SET trim_start=excluded.trim_start,
trim_end=excluded.trim_end,
folder_type=excluded.folder_type""",
(session_id, source_folder, trim_start, trim_end, folder_type.value, folder_order)
(session_id, source_folder, trim_start, trim_end, folder_type.value)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save trim settings: {e}") from e
@@ -642,62 +396,6 @@ class DatabaseManager:
return {row[0]: (row[1], row[2]) for row in rows}
def get_all_folder_settings(self, session_id: int) -> dict[str, tuple[int, int, FolderType]]:
"""Get all folder settings (trim + type) for a session, unordered.
Returns:
Dict mapping source_folder to (trim_start, trim_end, folder_type).
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, trim_start, trim_end, folder_type
FROM sequence_trim_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
result = {}
for row in rows:
try:
ft = FolderType(row[3]) if row[3] else FolderType.AUTO
except ValueError:
ft = FolderType.AUTO
result[row[0]] = (row[1], row[2], ft)
return result
def get_ordered_folders(self, session_id: int) -> list[tuple[str, FolderType, int, int]]:
"""Get all folders for a session in saved order.
Returns:
List of (source_folder, folder_type, trim_start, trim_end) sorted by folder_order.
Returns empty list if folder_order is not meaningful (all zeros from
pre-migration sessions), so the caller falls back to symlink-derived order.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, folder_type, trim_start, trim_end, folder_order
FROM sequence_trim_settings WHERE session_id = ?
ORDER BY folder_order""",
(session_id,)
).fetchall()
if not rows:
return []
# If all folder_order values are 0, this is a pre-migration session
# where the ordering is not meaningful — return empty to trigger
# the legacy symlink-derived ordering path.
if len(rows) > 1 and all(row[4] == 0 for row in rows):
return []
result = []
for row in rows:
try:
ft = FolderType(row[1]) if row[1] else FolderType.AUTO
except ValueError:
ft = FolderType.AUTO
result.append((row[0], ft, row[2], row[3]))
return result
def save_transition_settings(
self,
session_id: int,
@@ -834,15 +532,13 @@ class DatabaseManager:
def save_per_transition_settings(
self,
session_id: int,
settings: PerTransitionSettings,
folder_order: int = 0,
settings: PerTransitionSettings
) -> None:
"""Save per-transition overlap settings.
Args:
session_id: The session ID.
settings: PerTransitionSettings to save.
folder_order: Position of this folder in the source list.
Raises:
DatabaseError: If saving fails.
@@ -851,13 +547,13 @@ class DatabaseManager:
with self._connect() as conn:
conn.execute(
"""INSERT INTO per_transition_settings
(session_id, trans_folder, left_overlap, right_overlap, folder_order)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder, folder_order)
(session_id, trans_folder, left_overlap, right_overlap)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_id, trans_folder)
DO UPDATE SET left_overlap=excluded.left_overlap,
right_overlap=excluded.right_overlap""",
(session_id, str(settings.trans_folder),
settings.left_overlap, settings.right_overlap, folder_order)
settings.left_overlap, settings.right_overlap)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save per-transition settings: {e}") from e
@@ -894,110 +590,27 @@ class DatabaseManager:
def get_all_per_transition_settings(
self,
session_id: int
) -> list[tuple[str, int, int, int]]:
) -> dict[str, PerTransitionSettings]:
"""Get all per-transition settings for a session.
Args:
session_id: The session ID.
Returns:
List of (trans_folder, left_overlap, right_overlap, folder_order) tuples.
Dict mapping transition folder paths to PerTransitionSettings.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT trans_folder, left_overlap, right_overlap, folder_order
FROM per_transition_settings WHERE session_id = ?
ORDER BY folder_order""",
"""SELECT trans_folder, left_overlap, right_overlap
FROM per_transition_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
return [(row[0], row[1], row[2], row[3]) for row in rows]
def save_removed_files(
self,
session_id: int,
source_folder: str,
filenames: list[str],
folder_order: int = 0,
) -> None:
"""Save removed files for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
filenames: List of removed filenames.
folder_order: Position of this folder in the source list.
"""
try:
with self._connect() as conn:
for filename in filenames:
conn.execute(
"""INSERT OR IGNORE INTO removed_files
(session_id, source_folder, filename, folder_order)
VALUES (?, ?, ?, ?)""",
(session_id, source_folder, filename, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save removed files: {e}") from e
def get_removed_files(self, session_id: int) -> dict[int, set[str]]:
"""Get all removed files for a session, keyed by folder_order.
Args:
session_id: The session ID.
Returns:
Dict mapping folder_order to sets of removed filenames.
"""
with self._connect() as conn:
rows = conn.execute(
"SELECT source_folder, filename, folder_order FROM removed_files WHERE session_id = ?",
(session_id,)
).fetchall()
result: dict[int, set[str]] = {}
for folder, filename, folder_order in rows:
if folder_order not in result:
result[folder_order] = set()
result[folder_order].add(filename)
return result
def save_direct_transition(
self,
session_id: int,
after_folder: str,
frame_count: int,
method: str,
enabled: bool,
folder_order: int = 0,
) -> None:
"""Save direct interpolation settings for a folder transition."""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO direct_transition_settings
(session_id, after_folder, frame_count, method, enabled, folder_order)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id, folder_order)
DO UPDATE SET after_folder=excluded.after_folder,
frame_count=excluded.frame_count,
method=excluded.method,
enabled=excluded.enabled""",
(session_id, after_folder, frame_count, method, 1 if enabled else 0, folder_order)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save direct transition: {e}") from e
def get_direct_transitions(self, session_id: int) -> list[tuple[str, int, str, bool, int]]:
"""Get direct interpolation settings for a session.
Returns:
List of (after_folder, frame_count, method, enabled, folder_order) tuples.
"""
with self._connect() as conn:
rows = conn.execute(
"SELECT after_folder, frame_count, method, enabled, folder_order "
"FROM direct_transition_settings WHERE session_id = ?",
(session_id,)
).fetchall()
return [(r[0], r[1], r[2], bool(r[3]), r[4]) for r in rows]
return {
row[0]: PerTransitionSettings(
trans_folder=Path(row[0]),
left_overlap=row[1],
right_overlap=row[2]
)
for row in rows
}

View File

@@ -1,285 +0,0 @@
#!/usr/bin/env python
"""FILM interpolation worker - runs in isolated venv with PyTorch.
This script is executed via subprocess from the main application.
It handles frame interpolation using Google Research's FILM model
(Frame Interpolation for Large Motion) via the frame-interpolation-pytorch repo.
FILM is better than RIFE for large motion and scene gaps, but slower.
Supports two modes:
1. Single frame: --output with --timestep
2. Batch mode: --output-dir with --frame-count (generates all frames at once)
"""
import argparse
import sys
import urllib.request
from pathlib import Path
import numpy as np
import torch
from PIL import Image
# Model download URL
FILM_MODEL_URL = "https://github.com/dajes/frame-interpolation-pytorch/releases/download/v1.0.2/film_net_fp32.pt"
FILM_MODEL_FILENAME = "film_net_fp32.pt"
def load_image(path: Path, device: torch.device) -> torch.Tensor:
"""Load image as tensor.
Args:
path: Path to image file.
device: Device to load tensor to.
Returns:
Image tensor (1, 3, H, W) normalized to [0, 1].
"""
img = Image.open(path).convert('RGB')
arr = np.array(img).astype(np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
return tensor.to(device)
def save_image(tensor: torch.Tensor, path: Path) -> None:
"""Save tensor as image.
Args:
tensor: Image tensor (1, 3, H, W) or (3, H, W) normalized to [0, 1].
path: Output path.
"""
if tensor.dim() == 4:
tensor = tensor.squeeze(0)
arr = tensor.permute(1, 2, 0).cpu().numpy()
arr = (arr * 255).clip(0, 255).astype(np.uint8)
Image.fromarray(arr).save(path)
# Global model cache
_model_cache: dict = {}
def download_model(model_dir: Path) -> Path:
"""Download FILM model if not present.
Args:
model_dir: Directory to store the model.
Returns:
Path to the downloaded model file.
"""
model_dir.mkdir(parents=True, exist_ok=True)
model_path = model_dir / FILM_MODEL_FILENAME
if not model_path.exists():
print(f"Downloading FILM model to {model_path}...", file=sys.stderr)
urllib.request.urlretrieve(FILM_MODEL_URL, model_path)
print("Download complete.", file=sys.stderr)
return model_path
def get_model(model_dir: Path, device: torch.device):
"""Get or load FILM model (cached).
Args:
model_dir: Model cache directory (for model downloads).
device: Device to run on.
Returns:
FILM TorchScript model instance.
"""
cache_key = f"film_{device}"
if cache_key not in _model_cache:
# Download model if needed
model_path = download_model(model_dir)
# Load pre-trained TorchScript model
print(f"Loading FILM model from {model_path}...", file=sys.stderr)
model = torch.jit.load(str(model_path), map_location='cpu')
model.eval()
model.to(device)
_model_cache[cache_key] = model
print("Model loaded.", file=sys.stderr)
return _model_cache[cache_key]
@torch.no_grad()
def interpolate_single(model, img0: torch.Tensor, img1: torch.Tensor, t: float) -> torch.Tensor:
"""Perform single frame interpolation using FILM.
Args:
model: FILM TorchScript model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
t: Interpolation timestep (0.0 to 1.0).
Returns:
Interpolated frame tensor.
"""
# FILM TorchScript model expects dt as tensor of shape (1, 1)
dt = img0.new_full((1, 1), t)
result = model(img0, img1, dt)
if isinstance(result, tuple):
result = result[0]
return result.clamp(0, 1)
@torch.no_grad()
def interpolate_batch(model, img0: torch.Tensor, img1: torch.Tensor, frame_count: int) -> list[torch.Tensor]:
"""Generate multiple interpolated frames using FILM's recursive approach.
FILM works best when generating frames recursively - it first generates
the middle frame, then fills in the gaps. This produces more consistent
results than generating arbitrary timesteps independently.
Args:
model: FILM model instance.
img0: First frame tensor (1, 3, H, W) normalized to [0, 1].
img1: Second frame tensor (1, 3, H, W) normalized to [0, 1].
frame_count: Number of frames to generate between img0 and img1.
Returns:
List of interpolated frame tensors in order.
"""
# Calculate timesteps for evenly spaced frames
timesteps = [(i + 1) / (frame_count + 1) for i in range(frame_count)]
# Try to use the model's batch/recursive interpolation if available
try:
# Some implementations have an interpolate_recursively method
if hasattr(model, 'interpolate_recursively'):
# This generates 2^n - 1 frames, so we need to handle arbitrary counts
results = model.interpolate_recursively(img0, img1, frame_count)
if len(results) >= frame_count:
return results[:frame_count]
except (AttributeError, TypeError):
pass
# Fall back to recursive binary interpolation for better quality
# This mimics FILM's natural recursive approach
frames = {} # timestep -> tensor
def recursive_interpolate(t_left: float, t_right: float, img_left: torch.Tensor, img_right: torch.Tensor, depth: int = 0):
"""Recursively interpolate to fill the gap."""
if depth > 10: # Prevent infinite recursion
return
t_mid = (t_left + t_right) / 2
# Check if we need a frame near t_mid
need_frame = False
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
need_frame = True
break
if not need_frame:
# Check if any remaining timesteps are in this range
remaining = [t for t in timesteps if t not in frames and t_left < t < t_right]
if not remaining:
return
# Generate middle frame
mid_frame = interpolate_single(model, img_left, img_right, 0.5)
# Assign to nearest needed timestep
for t in timesteps:
if t not in frames and abs(t - t_mid) < 0.5 / (frame_count + 1):
frames[t] = mid_frame
break
# Recurse into left and right halves
recursive_interpolate(t_left, t_mid, img_left, mid_frame, depth + 1)
recursive_interpolate(t_mid, t_right, mid_frame, img_right, depth + 1)
# Start recursive interpolation
recursive_interpolate(0.0, 1.0, img0, img1)
# Fill any remaining timesteps with direct interpolation
for t in timesteps:
if t not in frames:
frames[t] = interpolate_single(model, img0, img1, t)
# Return frames in order
return [frames[t] for t in timesteps]
def main():
parser = argparse.ArgumentParser(description='FILM frame interpolation worker')
parser.add_argument('--input0', required=True, help='Path to first input image')
parser.add_argument('--input1', required=True, help='Path to second input image')
parser.add_argument('--output', help='Path to output image (single frame mode)')
parser.add_argument('--output-dir', help='Output directory (batch mode)')
parser.add_argument('--output-pattern', default='frame_{:04d}.png',
help='Output filename pattern for batch mode')
parser.add_argument('--timestep', type=float, default=0.5,
help='Interpolation timestep 0-1 (single frame mode)')
parser.add_argument('--frame-count', type=int,
help='Number of frames to generate (batch mode)')
parser.add_argument('--repo-dir', help='Unused (kept for backward compat)')
parser.add_argument('--model-dir', required=True, help='Model cache directory')
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
args = parser.parse_args()
# Validate arguments
batch_mode = args.output_dir is not None and args.frame_count is not None
single_mode = args.output is not None
if not batch_mode and not single_mode:
print("Error: Must specify either --output (single) or --output-dir + --frame-count (batch)",
file=sys.stderr)
return 1
try:
# Select device
if args.device == 'cuda' and torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load model
model_dir = Path(args.model_dir)
model = get_model(model_dir, device)
# Load images
img0 = load_image(Path(args.input0), device)
img1 = load_image(Path(args.input1), device)
if batch_mode:
# Batch mode - generate all frames at once
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Generating {args.frame_count} frames...", file=sys.stderr)
frames = interpolate_batch(model, img0, img1, args.frame_count)
for i, frame in enumerate(frames):
output_path = output_dir / args.output_pattern.format(i)
save_image(frame, output_path)
print(f"Saved {output_path.name}", file=sys.stderr)
print(f"Success: Generated {len(frames)} frames", file=sys.stderr)
else:
# Single frame mode
result = interpolate_single(model, img0, img1, args.timestep)
save_image(result, Path(args.output))
print("Success", file=sys.stderr)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -80,12 +80,11 @@ class SymlinkManager:
@staticmethod
def cleanup_old_links(directory: Path) -> int:
"""Remove existing seq* symlinks and temporary files from a directory.
"""Remove existing seq* symlinks from a directory.
Handles all naming formats:
- Old folder-indexed: seq01_0000.png
- Continuous: seq_00000.png
Also removes blended image files and film_temp_*.png temporaries.
Handles both old format (seq_0000) and new format (seq01_0000).
Also removes blended image files (not just symlinks) created by
cross-dissolve transitions.
Args:
directory: Directory to clean up.
@@ -97,134 +96,31 @@ class SymlinkManager:
CleanupError: If cleanup fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
seq_pattern = re.compile(r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE)
try:
for item in directory.iterdir():
should_remove = False
# Match both old (seq_NNNN) and new (seqNN_NNNN) formats
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
item.unlink()
removed += 1
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
# Also remove blended image files
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to clean up old links: {e}") from e
return removed
@staticmethod
def remove_orphan_files(directory: Path, keep_names: set[str]) -> int:
"""Remove seq* files and film_temp_* not in the keep set.
Same pattern matching as cleanup_old_links but skips filenames
present in keep_names.
Args:
directory: Directory to clean orphans from.
keep_names: Set of filenames to keep.
Returns:
Number of files removed.
Raises:
CleanupError: If removal fails.
"""
removed = 0
seq_pattern = re.compile(
r'^seq\d*_\d+\.(png|jpg|jpeg|webp)$', re.IGNORECASE
)
temp_pattern = re.compile(
r'^film_temp_\d+\.png$', re.IGNORECASE
)
try:
for item in directory.iterdir():
if item.name in keep_names:
continue
should_remove = False
if item.name.startswith("seq"):
if item.is_symlink():
should_remove = True
elif item.is_file() and seq_pattern.match(item.name):
should_remove = True
elif item.is_file() and temp_pattern.match(item.name):
should_remove = True
if should_remove:
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to remove orphan files: {e}") from e
return removed
@staticmethod
def symlink_matches(link_path: Path, expected_source: Path) -> bool:
"""Check if existing symlink resolves to expected source."""
if not link_path.is_symlink():
return False
try:
return link_path.resolve() == expected_source.resolve()
except OSError:
return False
@staticmethod
def copy_matches(dest_path: Path, source_path: Path) -> bool:
"""Check if existing copy matches source.
Fast path: size + mtime comparison. If sizes match but mtimes
differ, falls back to comparing file contents so that a
re-export after touching (but not changing) the source is still
skipped, while a genuine content change is caught.
"""
if not dest_path.is_file() or dest_path.is_symlink():
return False
try:
src_stat = source_path.stat()
dst_stat = dest_path.stat()
if src_stat.st_size != dst_stat.st_size:
return False
# Fast path: identical mtime means the copy2 wrote this file
if abs(src_stat.st_mtime - dst_stat.st_mtime) < 2.0:
return True
# Size matches but mtime differs — compare contents
return SymlinkManager._files_equal(source_path, dest_path)
except OSError:
return False
@staticmethod
def _files_equal(a: Path, b: Path, chunk_size: int = 65536) -> bool:
"""Compare two files by reading in chunks."""
try:
with open(a, 'rb') as fa, open(b, 'rb') as fb:
while True:
ca = fa.read(chunk_size)
cb = fb.read(chunk_size)
if ca != cb:
return False
if not ca:
return True
except OSError:
return False
def create_sequence_links(
self,
sources: list[Path],
dest: Path,
files: list[tuple],
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
copy_files: bool = False,
) -> tuple[list[LinkResult], Optional[int]]:
"""Create sequenced symlinks or copies from source files to destination.
"""Create sequenced symlinks from source files to destination.
Args:
sources: List of source directories (for validation).
@@ -233,12 +129,12 @@ class SymlinkManager:
- (source_dir, filename) for CLI mode (uses global sequence)
- (source_dir, filename, folder_idx, file_idx) for GUI mode
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
copy_files: If True, copy files instead of creating symlinks.
Returns:
Tuple of (list of LinkResult objects, session_id or None).
"""
self.validate_paths(sources, dest)
self.cleanup_old_links(dest)
session_id = None
if self.db:
@@ -269,13 +165,6 @@ class SymlinkManager:
expanded_files.append((source_dir, filename, folder_idx, file_idx))
files = expanded_files
# Build planned names for orphan removal
planned_names: set[str] = set()
for file_data in files:
_, fn, fi, fli = file_data
ext = Path(fn).suffix
planned_names.add(f"seq{fi + 1:02d}_{fli:04d}{ext}")
for i, file_data in enumerate(files):
source_dir, filename, folder_idx, file_idx = file_data
source_path = source_dir / filename
@@ -283,25 +172,11 @@ class SymlinkManager:
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
link_path = dest / link_name
# Calculate relative path from destination to source
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
try:
# Check if existing file already matches
already_correct = False
if link_path.exists() or link_path.is_symlink():
if copy_files:
already_correct = self.copy_matches(link_path, source_path)
else:
already_correct = self.symlink_matches(link_path, source_path)
if not already_correct:
if link_path.exists() or link_path.is_symlink():
link_path.unlink()
if copy_files:
import shutil
shutil.copy2(source_path, link_path)
else:
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
link_path.symlink_to(rel_source)
link_path.symlink_to(rel_source)
if self.db and session_id:
self.db.record_symlink(
@@ -327,10 +202,4 @@ class SymlinkManager:
error=str(e)
))
# Remove orphan seq*/film_temp_* files not in the planned set
try:
self.remove_orphan_files(dest, planned_names)
except CleanupError:
pass
return results, session_id

View File

@@ -21,8 +21,7 @@ class BlendMethod(Enum):
"""Blend method types for transitions."""
ALPHA = 'alpha' # Simple cross-dissolve (PIL.Image.blend)
OPTICAL_FLOW = 'optical' # OpenCV Farneback optical flow
RIFE = 'rife' # AI frame interpolation (NCNN binary)
RIFE_PRACTICAL = 'rife_practical' # Practical-RIFE Python/PyTorch implementation
RIFE = 'rife' # AI frame interpolation (NCNN binary or PyTorch)
class FolderType(Enum):
@@ -32,12 +31,6 @@ class FolderType(Enum):
TRANSITION = 'transition'
class DirectInterpolationMethod(Enum):
"""Method for direct frame interpolation between sequences."""
RIFE = 'rife'
FILM = 'film'
# --- Data Classes ---
@dataclass
@@ -51,60 +44,14 @@ class TransitionSettings:
trans_destination: Optional[Path] = None # separate destination for transition output
blend_method: BlendMethod = BlendMethod.ALPHA # blending method
rife_binary_path: Optional[Path] = None # path to rife-ncnn-vulkan binary
rife_model: str = 'rife-v4.6' # RIFE model to use
rife_uhd: bool = False # Enable UHD mode for high resolution
rife_tta: bool = False # Enable TTA mode for better quality
# Practical-RIFE settings
practical_rife_model: str = 'v4.25' # v4.25, v4.26, v4.22, etc.
practical_rife_ensemble: bool = False # Ensemble mode for better quality (slower)
# Optical flow settings
of_preset: str = 'balanced' # fast, balanced, quality, max
of_levels: int = 3 # pyramid levels (1-7)
of_winsize: int = 15 # window size (5-51, odd)
of_iterations: int = 3 # iterations (1-10)
of_poly_n: int = 5 # polynomial neighborhood (5 or 7)
of_poly_sigma: float = 1.2 # gaussian sigma (0.5-2.0)
@dataclass
class PerTransitionSettings:
"""Per-transition overlap settings for cross-dissolves."""
"""Per-transition overlap settings for asymmetric cross-dissolves."""
trans_folder: Path
left_overlap: int = 16 # overlap count at left boundary (MAIN→TRANS)
right_overlap: int = 16 # overlap count at right boundary (TRANS→MAIN)
@dataclass
class DirectTransitionSettings:
"""Settings for direct AI interpolation between sequences (no transition folder)."""
after_folder: Path # The folder after which this transition occurs
frame_count: int = 16 # Number of interpolated frames to generate
method: DirectInterpolationMethod = DirectInterpolationMethod.FILM
enabled: bool = True
@dataclass
class VideoPreset:
"""Preset for video encoding via ffmpeg."""
label: str # Display name
container: str # 'mp4' or 'webm'
codec: str # ffmpeg codec: libx264, libx265, libvpx-vp9, libaom-av1
crf: int
pixel_format: str = 'yuv420p'
preset: str = 'medium' # x264/x265 speed preset
max_height: Optional[int] = None # Downscale filter
extra_args: list[str] = field(default_factory=list)
VIDEO_PRESETS: dict[str, VideoPreset] = {
'web_streaming': VideoPreset('Web Streaming', 'mp4', 'libx264', 23, preset='medium'),
'high_quality': VideoPreset('High Quality', 'mp4', 'libx264', 18, preset='slow'),
'archive': VideoPreset('Archive (H.265)', 'mp4', 'libx265', 18, preset='slow', extra_args=['-tag:v', 'hvc1']),
'social_media': VideoPreset('Social Media', 'mp4', 'libx264', 23, preset='fast', max_height=1080),
'fast_preview': VideoPreset('Fast Preview', 'mp4', 'libx264', 28, preset='ultrafast'),
'webm_vp9': VideoPreset('WebM VP9', 'webm', 'libvpx-vp9', 30, extra_args=['-b:v', '0']),
'webm_av1': VideoPreset('WebM AV1', 'webm', 'libaom-av1', 30, extra_args=['-b:v', '0', '-strict', 'experimental']),
'godot_theora': VideoPreset('Godot (Theora)', 'ogv', 'libtheora', 8, extra_args=['-g', '512']),
}
left_overlap: int = 16 # frames from main folder end
right_overlap: int = 16 # frames from trans folder start
@dataclass
@@ -130,9 +77,6 @@ class TransitionSpec:
# Indices into the overall file list
main_start_idx: int
trans_start_idx: int
# Position indices in the folders list (for duplicate folder support)
main_folder_idx: int = 0
trans_folder_idx: int = 0
@dataclass
@@ -164,8 +108,6 @@ class SessionRecord:
created_at: datetime
destination: str
link_count: int = 0
name: Optional[str] = None
locked: bool = False
# --- Exceptions ---

View File

@@ -1,429 +0,0 @@
#!/usr/bin/env python
"""RIFE interpolation worker - runs in isolated venv with PyTorch.
This script is executed via subprocess from the main application.
It handles loading Practical-RIFE models and performing frame interpolation.
Note: The Practical-RIFE models require the IFNet architecture from the
Practical-RIFE repository. This script downloads and uses the model weights
with a simplified inference implementation.
"""
import argparse
import os
import shutil
import sys
import tempfile
import urllib.request
import zipfile
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image
def conv(in_planes, out_planes, kernel_size=3, stride=1, padding=1, dilation=1):
return nn.Sequential(
nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride,
padding=padding, dilation=dilation, bias=True),
nn.PReLU(out_planes)
)
def deconv(in_planes, out_planes, kernel_size=4, stride=2, padding=1):
return nn.Sequential(
nn.ConvTranspose2d(in_planes, out_planes, kernel_size, stride, padding, bias=True),
nn.PReLU(out_planes)
)
class IFBlock(nn.Module):
def __init__(self, in_planes, c=64):
super(IFBlock, self).__init__()
self.conv0 = nn.Sequential(
conv(in_planes, c//2, 3, 2, 1),
conv(c//2, c, 3, 2, 1),
)
self.convblock = nn.Sequential(
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
conv(c, c),
)
self.lastconv = nn.ConvTranspose2d(c, 5, 4, 2, 1)
def forward(self, x, flow=None, scale=1):
x = F.interpolate(x, scale_factor=1./scale, mode="bilinear", align_corners=False)
if flow is not None:
flow = F.interpolate(flow, scale_factor=1./scale, mode="bilinear", align_corners=False) / scale
x = torch.cat((x, flow), 1)
feat = self.conv0(x)
feat = self.convblock(feat) + feat
tmp = self.lastconv(feat)
tmp = F.interpolate(tmp, scale_factor=scale*2, mode="bilinear", align_corners=False)
flow = tmp[:, :4] * scale * 2
mask = tmp[:, 4:5]
return flow, mask
def warp(tenInput, tenFlow):
k = (str(tenFlow.device), str(tenFlow.size()))
backwarp_tenGrid = {}
if k not in backwarp_tenGrid:
tenHorizontal = torch.linspace(-1.0, 1.0, tenFlow.shape[3], device=tenFlow.device).view(
1, 1, 1, tenFlow.shape[3]).expand(tenFlow.shape[0], -1, tenFlow.shape[2], -1)
tenVertical = torch.linspace(-1.0, 1.0, tenFlow.shape[2], device=tenFlow.device).view(
1, 1, tenFlow.shape[2], 1).expand(tenFlow.shape[0], -1, -1, tenFlow.shape[3])
backwarp_tenGrid[k] = torch.cat([tenHorizontal, tenVertical], 1)
tenFlow = torch.cat([tenFlow[:, 0:1, :, :] / ((tenInput.shape[3] - 1.0) / 2.0),
tenFlow[:, 1:2, :, :] / ((tenInput.shape[2] - 1.0) / 2.0)], 1)
g = (backwarp_tenGrid[k] + tenFlow).permute(0, 2, 3, 1)
return F.grid_sample(input=tenInput, grid=g, mode='bilinear', padding_mode='border', align_corners=True)
class IFNet(nn.Module):
"""IFNet architecture for Practical-RIFE v4.25/v4.26 models."""
def __init__(self):
super(IFNet, self).__init__()
# v4.25/v4.26 architecture:
# block0 input: img0(3) + img1(3) + f0(4) + f1(4) + timestep(1) = 15
# block1+ input: img0(3) + img1(3) + wf0(4) + wf1(4) + f0(4) + f1(4) + timestep(1) + mask(1) + flow(4) = 28
self.block0 = IFBlock(3+3+4+4+1, c=192)
self.block1 = IFBlock(3+3+4+4+4+4+1+1+4, c=128)
self.block2 = IFBlock(3+3+4+4+4+4+1+1+4, c=96)
self.block3 = IFBlock(3+3+4+4+4+4+1+1+4, c=64)
# Encode produces 4-channel features
self.encode = nn.Sequential(
nn.Conv2d(3, 32, 3, 2, 1),
nn.LeakyReLU(0.2, True),
nn.Conv2d(32, 32, 3, 1, 1),
nn.LeakyReLU(0.2, True),
nn.Conv2d(32, 32, 3, 1, 1),
nn.LeakyReLU(0.2, True),
nn.ConvTranspose2d(32, 4, 4, 2, 1)
)
def forward(self, img0, img1, timestep=0.5, scale_list=[8, 4, 2, 1]):
f0 = self.encode(img0[:, :3])
f1 = self.encode(img1[:, :3])
warped_img0 = img0
warped_img1 = img1
flow = None
mask = None
block = [self.block0, self.block1, self.block2, self.block3]
for i in range(4):
if flow is None:
flow, mask = block[i](
torch.cat((img0[:, :3], img1[:, :3], f0, f1, timestep), 1),
None, scale=scale_list[i])
else:
wf0 = warp(f0, flow[:, :2])
wf1 = warp(f1, flow[:, 2:4])
fd, m0 = block[i](
torch.cat((warped_img0[:, :3], warped_img1[:, :3], wf0, wf1, f0, f1, timestep, mask), 1),
flow, scale=scale_list[i])
flow = flow + fd
mask = mask + m0
warped_img0 = warp(img0, flow[:, :2])
warped_img1 = warp(img1, flow[:, 2:4])
mask_final = torch.sigmoid(mask)
merged_final = warped_img0 * mask_final + warped_img1 * (1 - mask_final)
return merged_final
# Model URLs for downloading (Google Drive direct download links)
# File IDs extracted from official Practical-RIFE repository
MODEL_URLS = {
'v4.26': 'https://drive.google.com/uc?export=download&id=1gViYvvQrtETBgU1w8axZSsr7YUuw31uy',
'v4.25': 'https://drive.google.com/uc?export=download&id=1ZKjcbmt1hypiFprJPIKW0Tt0lr_2i7bg',
'v4.22': 'https://drive.google.com/uc?export=download&id=1qh2DSA9a1eZUTtZG9U9RQKO7N7OaUJ0_',
'v4.20': 'https://drive.google.com/uc?export=download&id=11n3YR7-qCRZm9RDdwtqOTsgCJUHPuexA',
'v4.18': 'https://drive.google.com/uc?export=download&id=1octn-UVuEjXa_HlsIUbNeLTTvYCKbC_s',
'v4.15': 'https://drive.google.com/uc?export=download&id=1xlem7cfKoMaiLzjoeum8KIQTYO-9iqG5',
}
def download_model(version: str, model_dir: Path) -> Path:
"""Download model if not already cached.
Google Drive links distribute zip files containing the model.
This function downloads and extracts the flownet.pkl file.
Args:
version: Model version (e.g., 'v4.25').
model_dir: Directory to store models.
Returns:
Path to the downloaded model file.
"""
model_dir.mkdir(parents=True, exist_ok=True)
model_path = model_dir / f'flownet_{version}.pkl'
if model_path.exists():
# Verify it's not a zip file (from previous failed attempt)
with open(model_path, 'rb') as f:
header = f.read(4)
if header == b'PK\x03\x04': # ZIP magic number
print(f"Removing corrupted zip file at {model_path}", file=sys.stderr)
model_path.unlink()
else:
return model_path
url = MODEL_URLS.get(version)
if not url:
raise ValueError(f"Unknown model version: {version}")
print(f"Downloading RIFE model {version}...", file=sys.stderr)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir) / 'download'
# Try using gdown for Google Drive (handles confirmations automatically)
downloaded = False
try:
import gdown
file_id = url.split('id=')[1] if 'id=' in url else None
if file_id:
gdown_url = f'https://drive.google.com/uc?id={file_id}'
gdown.download(gdown_url, str(tmp_path), quiet=False)
downloaded = tmp_path.exists()
except ImportError:
print("gdown not available, trying direct download...", file=sys.stderr)
except Exception as e:
print(f"gdown failed: {e}, trying direct download...", file=sys.stderr)
# Fallback: direct download
if not downloaded:
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=300) as response:
data = response.read()
if data[:100].startswith(b'<!') or b'<html' in data[:500].lower():
raise RuntimeError("Google Drive returned HTML - install gdown: pip install gdown")
with open(tmp_path, 'wb') as f:
f.write(data)
downloaded = True
except Exception as e:
raise RuntimeError(f"Failed to download model: {e}")
if not downloaded or not tmp_path.exists():
raise RuntimeError("Download failed - no file received")
# Check if downloaded file is a zip archive
with open(tmp_path, 'rb') as f:
header = f.read(4)
if header == b'PK\x03\x04': # ZIP magic number
print(f"Extracting model from zip archive...", file=sys.stderr)
with zipfile.ZipFile(tmp_path, 'r') as zf:
# Find flownet.pkl in the archive
pkl_files = [n for n in zf.namelist() if n.endswith('flownet.pkl')]
if not pkl_files:
raise RuntimeError(f"No flownet.pkl found in zip. Contents: {zf.namelist()}")
# Extract the pkl file
pkl_name = pkl_files[0]
with zf.open(pkl_name) as src, open(model_path, 'wb') as dst:
dst.write(src.read())
else:
# Already a pkl file, just move it
shutil.move(str(tmp_path), str(model_path))
print(f"Model saved to {model_path}", file=sys.stderr)
return model_path
def load_model(model_path: Path, device: torch.device) -> IFNet:
"""Load IFNet model from state dict.
Args:
model_path: Path to flownet.pkl file.
device: Device to load model to.
Returns:
Loaded IFNet model.
"""
model = IFNet()
state_dict = torch.load(model_path, map_location='cpu')
# Handle different state dict formats
if 'state_dict' in state_dict:
state_dict = state_dict['state_dict']
# Remove 'module.' prefix if present (from DataParallel)
new_state_dict = {}
for k, v in state_dict.items():
if k.startswith('module.'):
k = k[7:]
# Handle flownet. prefix
if k.startswith('flownet.'):
k = k[8:]
new_state_dict[k] = v
model.load_state_dict(new_state_dict, strict=False)
model.to(device)
model.eval()
return model
def pad_image(img: torch.Tensor, padding: int = 64) -> tuple:
"""Pad image to be divisible by padding.
Args:
img: Input tensor (B, C, H, W).
padding: Padding divisor.
Returns:
Tuple of (padded image, (original H, original W)).
"""
_, _, h, w = img.shape
ph = ((h - 1) // padding + 1) * padding
pw = ((w - 1) // padding + 1) * padding
pad_h = ph - h
pad_w = pw - w
padded = F.pad(img, (0, pad_w, 0, pad_h), mode='replicate')
return padded, (h, w)
@torch.no_grad()
def inference(model: IFNet, img0: torch.Tensor, img1: torch.Tensor,
timestep: float = 0.5, ensemble: bool = False) -> torch.Tensor:
"""Perform frame interpolation.
Args:
model: Loaded IFNet model.
img0: First frame tensor (B, C, H, W) normalized to [0, 1].
img1: Second frame tensor (B, C, H, W) normalized to [0, 1].
timestep: Interpolation timestep (0.0 to 1.0).
ensemble: Enable ensemble mode for better quality.
Returns:
Interpolated frame tensor.
"""
# Pad images
img0_padded, orig_size = pad_image(img0)
img1_padded, _ = pad_image(img1)
h, w = orig_size
# Create timestep tensor
timestep_tensor = torch.full((1, 1, img0_padded.shape[2], img0_padded.shape[3]),
timestep, device=img0.device)
if ensemble:
# Ensemble: average of forward and reverse
result1 = model(img0_padded, img1_padded, timestep_tensor)
result2 = model(img1_padded, img0_padded, 1 - timestep_tensor)
result = (result1 + result2) / 2
else:
result = model(img0_padded, img1_padded, timestep_tensor)
# Crop back to original size
result = result[:, :, :h, :w]
return result.clamp(0, 1)
def load_image(path: Path, device: torch.device) -> torch.Tensor:
"""Load image as tensor.
Args:
path: Path to image file.
device: Device to load tensor to.
Returns:
Image tensor (1, 3, H, W) normalized to [0, 1].
"""
img = Image.open(path).convert('RGB')
arr = np.array(img).astype(np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0)
return tensor.to(device)
def save_image(tensor: torch.Tensor, path: Path) -> None:
"""Save tensor as image.
Args:
tensor: Image tensor (1, 3, H, W) normalized to [0, 1].
path: Output path.
"""
arr = tensor.squeeze(0).permute(1, 2, 0).cpu().numpy()
arr = (arr * 255).clip(0, 255).astype(np.uint8)
Image.fromarray(arr).save(path)
# Global model cache
_model_cache: dict = {}
def get_model(version: str, model_dir: Path, device: torch.device) -> IFNet:
"""Get or load model (cached).
Args:
version: Model version.
model_dir: Model cache directory.
device: Device to run on.
Returns:
IFNet model instance.
"""
cache_key = f"{version}_{device}"
if cache_key not in _model_cache:
model_path = download_model(version, model_dir)
_model_cache[cache_key] = load_model(model_path, device)
return _model_cache[cache_key]
def main():
parser = argparse.ArgumentParser(description='RIFE frame interpolation worker')
parser.add_argument('--input0', required=True, help='Path to first input image')
parser.add_argument('--input1', required=True, help='Path to second input image')
parser.add_argument('--output', required=True, help='Path to output image')
parser.add_argument('--timestep', type=float, default=0.5, help='Interpolation timestep (0-1)')
parser.add_argument('--model', default='v4.25', help='Model version')
parser.add_argument('--model-dir', required=True, help='Model cache directory')
parser.add_argument('--ensemble', action='store_true', help='Enable ensemble mode')
parser.add_argument('--device', default='cuda', choices=['cuda', 'cpu'], help='Device to use')
args = parser.parse_args()
try:
# Select device
if args.device == 'cuda' and torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
# Load model
model_dir = Path(args.model_dir)
model = get_model(args.model, model_dir, device)
# Load images
img0 = load_image(Path(args.input0), device)
img1 = load_image(Path(args.input1), device)
# Interpolate
result = inference(model, img0, img1, args.timestep, args.ensemble)
# Save result
save_image(result, Path(args.output))
print("Success", file=sys.stderr)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,259 +0,0 @@
"""Video encoding utilities wrapping ffmpeg."""
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Callable, Optional
from .models import VideoPreset
def find_ffmpeg() -> Optional[Path]:
"""Find the ffmpeg binary on the system PATH."""
result = shutil.which('ffmpeg')
return Path(result) if result else None
def encode_image_sequence(
input_dir: Path,
output_path: Path,
fps: int,
preset: VideoPreset,
input_pattern: Optional[str] = None,
progress_callback: Optional[Callable[[int, int], bool]] = None,
total_frames: Optional[int] = None,
) -> tuple[bool, str]:
"""Encode an image sequence directory to a video file using ffmpeg.
Args:
input_dir: Directory containing sequentially named image files.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
input_pattern: ffmpeg input pattern (e.g. 'seq_%06d.png').
Auto-detected from first seq_* file if not provided.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
total_frames: Total number of frames for progress reporting.
Auto-counted from input_dir if not provided.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
# Auto-detect input pattern from first seq_* file
if input_pattern is None:
input_pattern = _detect_input_pattern(input_dir)
if input_pattern is None:
return False, f"No seq_* image files found in {input_dir}"
# Auto-count frames
if total_frames is None:
ext = Path(input_pattern).suffix
total_frames = len(list(input_dir.glob(f"seq_*{ext}")))
if total_frames == 0:
return False, f"No matching frames found in {input_dir}"
# Build ffmpeg command
cmd = [
str(ffmpeg), '-y',
'-framerate', str(fps),
'-i', str(input_dir / input_pattern),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
# Add speed preset for x264/x265
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
# Add downscale filter if max_height is set
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
# Add any extra codec-specific args
if preset.extra_args:
cmd += preset.extra_args
# Progress parsing via -progress pipe:1
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
# Clean up partial file
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
def _detect_input_pattern(input_dir: Path) -> Optional[str]:
"""Detect the ffmpeg input pattern from seq_* files in a directory.
Looks for files like seq_000000.png and returns a pattern like seq_%06d.png.
"""
for f in sorted(input_dir.iterdir()):
m = re.match(r'^(seq_)(\d+)(\.\w+)$', f.name)
if m:
prefix = m.group(1)
digits = m.group(2)
ext = m.group(3)
width = len(digits)
return f"{prefix}%0{width}d{ext}"
return None
def encode_from_file_list(
file_paths: list[Path],
output_path: Path,
fps: int,
preset: VideoPreset,
progress_callback: Optional[Callable[[int, int], bool]] = None,
) -> tuple[bool, str]:
"""Encode a video from an explicit list of image file paths.
Uses ffmpeg's concat demuxer so files can be scattered across directories.
Args:
file_paths: Ordered list of image file paths.
output_path: Output video file path.
fps: Frames per second.
preset: VideoPreset with codec settings.
progress_callback: Called with (current_frame, total_frames).
Return False to cancel encoding.
Returns:
(success, message) — message is output_path on success or error text on failure.
"""
ffmpeg = find_ffmpeg()
if not ffmpeg:
return False, "ffmpeg not found. Install ffmpeg to encode video."
if not file_paths:
return False, "No files provided."
total_frames = len(file_paths)
frame_duration = f"{1.0 / fps:.10f}"
# Write a concat-demuxer file listing each image with its duration
try:
concat_file = tempfile.NamedTemporaryFile(
mode='w', suffix='.txt', delete=False, prefix='vml_concat_'
)
concat_path = Path(concat_file.name)
for p in file_paths:
# Escape single quotes for ffmpeg concat format
escaped = str(p.resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.write(f"duration {frame_duration}\n")
# Repeat last file so the last frame displays for its full duration
escaped = str(file_paths[-1].resolve()).replace("'", "'\\''")
concat_file.write(f"file '{escaped}'\n")
concat_file.close()
except OSError as e:
return False, f"Failed to create concat file: {e}"
cmd = [
str(ffmpeg), '-y',
'-f', 'concat', '-safe', '0',
'-i', str(concat_path),
'-c:v', preset.codec,
'-q:v' if preset.codec == 'libtheora' else '-crf', str(preset.crf),
'-pix_fmt', preset.pixel_format,
]
if preset.codec in ('libx264', 'libx265'):
cmd += ['-preset', preset.preset]
if preset.max_height is not None:
cmd += ['-vf', f'scale=-2:{preset.max_height}']
if preset.extra_args:
cmd += preset.extra_args
cmd += ['-progress', 'pipe:1']
cmd.append(str(output_path))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
cancelled = False
if proc.stdout:
for line in proc.stdout:
line = line.strip()
m = re.match(r'^frame=(\d+)', line)
if m and progress_callback is not None:
current = int(m.group(1))
if not progress_callback(current, total_frames):
cancelled = True
proc.terminate()
proc.wait()
break
proc.wait()
if cancelled:
if output_path.exists():
output_path.unlink()
return False, "Encoding cancelled by user."
if proc.returncode != 0:
stderr = proc.stderr.read() if proc.stderr else ""
return False, f"ffmpeg exited with code {proc.returncode}:\n{stderr}"
return True, str(output_path)
except FileNotFoundError:
return False, "ffmpeg binary not found."
except Exception as e:
return False, f"Encoding error: {e}"
finally:
try:
concat_path.unlink(missing_ok=True)
except OSError:
pass

View File

@@ -0,0 +1,50 @@
# Gitea Push Mirror Setup Instructions
## Prerequisites
- Gitea API token
- GitHub Personal Access Token (PAT) with `repo` scope
## Tokens
| Service | Token |
|---------|-------|
| Gitea | `1b99442d046cdfa2f5d76322a6081131a0561c53` |
| GitHub | `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` |
## API Endpoint
```
POST http://192.168.1.1:3000/api/v1/repos/{owner}/{repo}/push_mirrors
```
## Command Template
```bash
curl -X POST "http://192.168.1.1:3000/api/v1/repos/Ethanfel/{REPO_NAME}/push_mirrors" \
-H "Authorization: token 1b99442d046cdfa2f5d76322a6081131a0561c53" \
-H "Content-Type: application/json" \
-d '{
"remote_address": "https://github.com/Ethanfel/{REPO_NAME}.git",
"remote_username": "Ethanfel",
"remote_password": "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",
"interval": "8h0m0s",
"sync_on_commit": true
}'
```
Replace `{REPO_NAME}` with the actual repository name.
## Verify Mirror
```bash
curl "http://192.168.1.1:3000/api/v1/repos/Ethanfel/{REPO_NAME}/push_mirrors" \
-H "Authorization: token 1b99442d046cdfa2f5d76322a6081131a0561c53"
```
## Notes
- GitHub repo must exist before creating the mirror
- Gitea runs on port 3000 (not 443)
- `sync_on_commit: true` pushes to GitHub on every commit
- `interval: 8h0m0s` syncs every 8 hours regardless of commits

File diff suppressed because it is too large Load Diff

View File

@@ -15,7 +15,6 @@ class TrimSlider(QWidget):
"""
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
trimDragFinished = pyqtSignal(int, int, str) # Emits final values on mouse release
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the trim slider.
@@ -288,11 +287,5 @@ class TrimSlider(QWidget):
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
"""Handle mouse release to stop dragging."""
if self._dragging:
handle = self._dragging
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
self.trimDragFinished.emit(self._trim_start, self._trim_end, handle)
else:
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)