Files
video-montage-linker/symlink.py
2026-02-03 16:21:56 +01:00

2126 lines
76 KiB
Python
Executable File

#!/usr/bin/env python3
"""Video Montage Linker - Create sequenced symlinks for image files.
Supports both GUI and CLI modes for creating numbered symlinks from one or more
source directories into a single destination directory.
"""
# --- Imports ---
import argparse
import os
import re
import sqlite3
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
from PyQt6.QtCore import Qt, QUrl, QEvent, QPoint, pyqtSignal, QRect
from PyQt6.QtGui import QDragEnterEvent, QDropEvent, QPainter, QColor, QBrush, QPen, QMouseEvent
from PyQt6.QtMultimedia import QMediaPlayer, QAudioOutput
from PyQt6.QtMultimediaWidgets import QVideoWidget
from PyQt6.QtWidgets import (
QApplication,
QWidget,
QVBoxLayout,
QPushButton,
QLabel,
QFileDialog,
QLineEdit,
QHBoxLayout,
QMessageBox,
QListWidget,
QTreeWidget,
QTreeWidgetItem,
QAbstractItemView,
QGroupBox,
QHeaderView,
QComboBox,
QSlider,
QSplitter,
QTabWidget,
QScrollArea,
QSizePolicy,
)
from PyQt6.QtGui import QPixmap, QKeyEvent
# --- Configuration ---
SUPPORTED_EXTENSIONS = ('.png', '.webp', '.jpg', '.jpeg')
VIDEO_EXTENSIONS = ('.mp4', '.webm', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.m4v')
DB_PATH = Path.home() / '.config' / 'video-montage-linker' / 'symlinks.db'
# --- Custom Widgets ---
class TrimSlider(QWidget):
"""A slider widget with two draggable handles for trimming sequences.
Allows setting in/out points for a sequence by dragging left and right handles.
Gray areas indicate trimmed regions, colored area indicates included images.
"""
trimChanged = pyqtSignal(int, int, str) # Emits (trim_start, trim_end, 'left' or 'right')
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the trim slider.
Args:
parent: Parent widget.
"""
super().__init__(parent)
self._total = 0
self._trim_start = 0
self._trim_end = 0
self._current_pos = 0
self._dragging: Optional[str] = None # 'left', 'right', or None
self._handle_width = 10
self._track_height = 20
self._enabled = True
self.setMinimumHeight(40)
self.setMinimumWidth(100)
self.setCursor(Qt.CursorShape.ArrowCursor)
self.setMouseTracking(True)
def setRange(self, total: int) -> None:
"""Set the total number of items in the sequence.
Args:
total: Total number of items.
"""
self._total = max(0, total)
# Clamp trim values to valid range
self._trim_start = min(self._trim_start, max(0, self._total - 1))
self._trim_end = min(self._trim_end, max(0, self._total - 1 - self._trim_start))
self.update()
def setTrimStart(self, value: int) -> None:
"""Set the trim start value.
Args:
value: Number of items to trim from start.
"""
max_start = max(0, self._total - 1 - self._trim_end)
self._trim_start = max(0, min(value, max_start))
self.update()
def setTrimEnd(self, value: int) -> None:
"""Set the trim end value.
Args:
value: Number of items to trim from end.
"""
max_end = max(0, self._total - 1 - self._trim_start)
self._trim_end = max(0, min(value, max_end))
self.update()
def setCurrentPosition(self, pos: int) -> None:
"""Set the current position indicator.
Args:
pos: Current position index.
"""
self._current_pos = max(0, min(pos, self._total - 1)) if self._total > 0 else 0
self.update()
def trimStart(self) -> int:
"""Get the trim start value."""
return self._trim_start
def trimEnd(self) -> int:
"""Get the trim end value."""
return self._trim_end
def total(self) -> int:
"""Get the total number of items."""
return self._total
def includedRange(self) -> tuple[int, int]:
"""Get the range of included items (after trimming).
Returns:
Tuple of (first_included_index, last_included_index).
Returns (-1, -1) if no items are included.
"""
if self._total == 0:
return (-1, -1)
first = self._trim_start
last = self._total - 1 - self._trim_end
if first > last:
return (-1, -1)
return (first, last)
def setEnabled(self, enabled: bool) -> None:
"""Enable or disable the widget."""
self._enabled = enabled
self.update()
def _track_rect(self) -> QRect:
"""Get the rectangle for the slider track."""
margin = self._handle_width
return QRect(
margin,
(self.height() - self._track_height) // 2,
self.width() - 2 * margin,
self._track_height
)
def _value_to_x(self, value: int) -> int:
"""Convert a value to an x coordinate."""
track = self._track_rect()
if self._total <= 1:
return track.left()
ratio = value / (self._total - 1)
return int(track.left() + ratio * track.width())
def _x_to_value(self, x: int) -> int:
"""Convert an x coordinate to a value."""
track = self._track_rect()
if track.width() == 0 or self._total <= 1:
return 0
ratio = (x - track.left()) / track.width()
ratio = max(0.0, min(1.0, ratio))
return int(round(ratio * (self._total - 1)))
def _left_handle_rect(self) -> QRect:
"""Get the rectangle for the left (trim start) handle."""
x = self._value_to_x(self._trim_start)
return QRect(
x - self._handle_width // 2,
(self.height() - self._track_height - 10) // 2,
self._handle_width,
self._track_height + 10
)
def _right_handle_rect(self) -> QRect:
"""Get the rectangle for the right (trim end) handle."""
x = self._value_to_x(self._total - 1 - self._trim_end) if self._total > 0 else 0
return QRect(
x - self._handle_width // 2,
(self.height() - self._track_height - 10) // 2,
self._handle_width,
self._track_height + 10
)
def paintEvent(self, event) -> None:
"""Paint the trim slider."""
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
track = self._track_rect()
# Colors
bg_color = QColor(60, 60, 60)
trimmed_color = QColor(80, 80, 80)
included_color = QColor(52, 152, 219) if self._enabled else QColor(100, 100, 100)
handle_color = QColor(200, 200, 200) if self._enabled else QColor(120, 120, 120)
position_color = QColor(255, 255, 255)
# Draw background track
painter.fillRect(track, bg_color)
if self._total > 0:
# Draw trimmed regions (darker)
left_trim_x = self._value_to_x(self._trim_start)
right_trim_x = self._value_to_x(self._total - 1 - self._trim_end)
# Left trimmed region
if self._trim_start > 0:
left_rect = QRect(track.left(), track.top(),
left_trim_x - track.left(), track.height())
painter.fillRect(left_rect, trimmed_color)
# Right trimmed region
if self._trim_end > 0:
right_rect = QRect(right_trim_x, track.top(),
track.right() - right_trim_x, track.height())
painter.fillRect(right_rect, trimmed_color)
# Draw included region
if left_trim_x < right_trim_x:
included_rect = QRect(left_trim_x, track.top(),
right_trim_x - left_trim_x, track.height())
painter.fillRect(included_rect, included_color)
# Draw current position indicator
if self._trim_start <= self._current_pos <= (self._total - 1 - self._trim_end):
pos_x = self._value_to_x(self._current_pos)
painter.setPen(QPen(position_color, 2))
painter.drawLine(pos_x, track.top() - 2, pos_x, track.bottom() + 2)
# Draw handles
painter.setBrush(QBrush(handle_color))
painter.setPen(QPen(Qt.GlobalColor.black, 1))
# Left handle
left_handle = self._left_handle_rect()
painter.drawRect(left_handle)
# Right handle
right_handle = self._right_handle_rect()
painter.drawRect(right_handle)
painter.end()
def mousePressEvent(self, event: QMouseEvent) -> None:
"""Handle mouse press to start dragging handles."""
if not self._enabled or self._total == 0:
return
pos = event.pos()
# Check if clicking on handles (check right first since it may overlap)
right_rect = self._right_handle_rect()
left_rect = self._left_handle_rect()
# Expand hit area slightly for easier grabbing
expand = 5
left_expanded = left_rect.adjusted(-expand, -expand, expand, expand)
right_expanded = right_rect.adjusted(-expand, -expand, expand, expand)
if right_expanded.contains(pos):
self._dragging = 'right'
elif left_expanded.contains(pos):
self._dragging = 'left'
else:
self._dragging = None
def mouseMoveEvent(self, event: QMouseEvent) -> None:
"""Handle mouse move to drag handles."""
if not self._enabled:
return
pos = event.pos()
# Update cursor based on position
if self._dragging:
self.setCursor(Qt.CursorShape.SizeHorCursor)
else:
left_rect = self._left_handle_rect()
right_rect = self._right_handle_rect()
expand = 5
left_expanded = left_rect.adjusted(-expand, -expand, expand, expand)
right_expanded = right_rect.adjusted(-expand, -expand, expand, expand)
if left_expanded.contains(pos) or right_expanded.contains(pos):
self.setCursor(Qt.CursorShape.SizeHorCursor)
else:
self.setCursor(Qt.CursorShape.ArrowCursor)
if self._dragging and self._total > 0:
value = self._x_to_value(pos.x())
if self._dragging == 'left':
# Left handle: set trim_start, clamped to not exceed right
max_start = self._total - 1 - self._trim_end
new_start = max(0, min(value, max_start))
if new_start != self._trim_start:
self._trim_start = new_start
self.update()
self.trimChanged.emit(self._trim_start, self._trim_end, 'left')
elif self._dragging == 'right':
# Right handle: set trim_end based on position
# value is the index position, trim_end is count from end
max_val = self._total - 1 - self._trim_start
clamped_value = max(self._trim_start, min(value, self._total - 1))
new_end = self._total - 1 - clamped_value
if new_end != self._trim_end:
self._trim_end = max(0, new_end)
self.update()
self.trimChanged.emit(self._trim_start, self._trim_end, 'right')
def mouseReleaseEvent(self, event: QMouseEvent) -> None:
"""Handle mouse release to stop dragging."""
self._dragging = None
self.setCursor(Qt.CursorShape.ArrowCursor)
# --- Exceptions ---
class SymlinkError(Exception):
"""Base exception for symlink operations."""
class PathValidationError(SymlinkError):
"""Error validating file paths."""
class SourceNotFoundError(PathValidationError):
"""Source directory does not exist."""
class DestinationError(PathValidationError):
"""Error with destination directory."""
class CleanupError(SymlinkError):
"""Error during cleanup of existing symlinks."""
class DatabaseError(SymlinkError):
"""Error with database operations."""
# --- Data Classes ---
@dataclass
class LinkResult:
"""Result of a symlink creation operation."""
source_path: Path
link_path: Path
sequence_number: int
success: bool
error: Optional[str] = None
@dataclass
class SymlinkRecord:
"""Database record of a created symlink."""
id: int
session_id: int
source_path: str
link_path: str
original_filename: str
sequence_number: int
created_at: datetime
@dataclass
class SessionRecord:
"""Database record of a symlink session."""
id: int
created_at: datetime
destination: str
link_count: int = 0
# --- Database ---
class DatabaseManager:
"""Manages SQLite database for tracking symlink sessions and links."""
def __init__(self, db_path: Path = DB_PATH) -> None:
"""Initialize database manager.
Args:
db_path: Path to the SQLite database file.
"""
self.db_path = db_path
self._ensure_db_exists()
def _ensure_db_exists(self) -> None:
"""Create database and tables if they don't exist."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS symlink_sessions (
id INTEGER PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
destination TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS symlinks (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_path TEXT NOT NULL,
link_path TEXT NOT NULL,
original_filename TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS sequence_trim_settings (
id INTEGER PRIMARY KEY,
session_id INTEGER REFERENCES symlink_sessions(id) ON DELETE CASCADE,
source_folder TEXT NOT NULL,
trim_start INTEGER DEFAULT 0,
trim_end INTEGER DEFAULT 0,
UNIQUE(session_id, source_folder)
);
""")
def _connect(self) -> sqlite3.Connection:
"""Create a database connection with foreign keys enabled."""
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def create_session(self, destination: str) -> int:
"""Create a new linking session.
Args:
destination: The destination directory path.
Returns:
The ID of the created session.
Raises:
DatabaseError: If session creation fails.
"""
try:
with self._connect() as conn:
cursor = conn.execute(
"INSERT INTO symlink_sessions (destination) VALUES (?)",
(destination,)
)
return cursor.lastrowid
except sqlite3.Error as e:
raise DatabaseError(f"Failed to create session: {e}") from e
def record_symlink(
self,
session_id: int,
source: str,
link: str,
filename: str,
seq: int
) -> int:
"""Record a created symlink.
Args:
session_id: The session this symlink belongs to.
source: Full path to the source file.
link: Full path to the created symlink.
filename: Original filename.
seq: Sequence number in the destination.
Returns:
The ID of the created record.
Raises:
DatabaseError: If recording fails.
"""
try:
with self._connect() as conn:
cursor = conn.execute(
"""INSERT INTO symlinks
(session_id, source_path, link_path, original_filename, sequence_number)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, link, filename, seq)
)
return cursor.lastrowid
except sqlite3.Error as e:
raise DatabaseError(f"Failed to record symlink: {e}") from e
def get_sessions(self) -> list[SessionRecord]:
"""List all sessions with link counts.
Returns:
List of session records.
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
GROUP BY s.id
ORDER BY s.created_at DESC
""").fetchall()
return [
SessionRecord(
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3]
)
for row in rows
]
def get_symlinks_by_session(self, session_id: int) -> list[SymlinkRecord]:
"""Get all symlinks for a session.
Args:
session_id: The session ID to query.
Returns:
List of symlink records.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT id, session_id, source_path, link_path,
original_filename, sequence_number, created_at
FROM symlinks WHERE session_id = ?
ORDER BY sequence_number""",
(session_id,)
).fetchall()
return [
SymlinkRecord(
id=row[0],
session_id=row[1],
source_path=row[2],
link_path=row[3],
original_filename=row[4],
sequence_number=row[5],
created_at=datetime.fromisoformat(row[6])
)
for row in rows
]
def get_symlinks_by_destination(self, dest: str) -> list[SymlinkRecord]:
"""Get all symlinks for a destination directory.
Args:
dest: The destination directory path.
Returns:
List of symlink records.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT l.id, l.session_id, l.source_path, l.link_path,
l.original_filename, l.sequence_number, l.created_at
FROM symlinks l
JOIN symlink_sessions s ON l.session_id = s.id
WHERE s.destination = ?
ORDER BY l.sequence_number""",
(dest,)
).fetchall()
return [
SymlinkRecord(
id=row[0],
session_id=row[1],
source_path=row[2],
link_path=row[3],
original_filename=row[4],
sequence_number=row[5],
created_at=datetime.fromisoformat(row[6])
)
for row in rows
]
def delete_session(self, session_id: int) -> None:
"""Delete a session and all its symlink records.
Args:
session_id: The session ID to delete.
Raises:
DatabaseError: If deletion fails.
"""
try:
with self._connect() as conn:
conn.execute("DELETE FROM symlinks WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM symlink_sessions WHERE id = ?", (session_id,))
except sqlite3.Error as e:
raise DatabaseError(f"Failed to delete session: {e}") from e
def get_sessions_by_destination(self, dest: str) -> list[SessionRecord]:
"""Get all sessions for a destination directory.
Args:
dest: The destination directory path.
Returns:
List of session records.
"""
with self._connect() as conn:
rows = conn.execute("""
SELECT s.id, s.created_at, s.destination, COUNT(l.id) as link_count
FROM symlink_sessions s
LEFT JOIN symlinks l ON s.id = l.session_id
WHERE s.destination = ?
GROUP BY s.id
ORDER BY s.created_at DESC
""", (dest,)).fetchall()
return [
SessionRecord(
id=row[0],
created_at=datetime.fromisoformat(row[1]),
destination=row[2],
link_count=row[3]
)
for row in rows
]
def save_trim_settings(
self,
session_id: int,
source_folder: str,
trim_start: int,
trim_end: int
) -> None:
"""Save trim settings for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
trim_start: Number of images to trim from start.
trim_end: Number of images to trim from end.
Raises:
DatabaseError: If saving fails.
"""
try:
with self._connect() as conn:
conn.execute(
"""INSERT INTO sequence_trim_settings
(session_id, source_folder, trim_start, trim_end)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_id, source_folder)
DO UPDATE SET trim_start=excluded.trim_start,
trim_end=excluded.trim_end""",
(session_id, source_folder, trim_start, trim_end)
)
except sqlite3.Error as e:
raise DatabaseError(f"Failed to save trim settings: {e}") from e
def get_trim_settings(
self,
session_id: int,
source_folder: str
) -> tuple[int, int]:
"""Get trim settings for a folder in a session.
Args:
session_id: The session ID.
source_folder: Path to the source folder.
Returns:
Tuple of (trim_start, trim_end). Returns (0, 0) if not found.
"""
with self._connect() as conn:
row = conn.execute(
"""SELECT trim_start, trim_end FROM sequence_trim_settings
WHERE session_id = ? AND source_folder = ?""",
(session_id, source_folder)
).fetchone()
if row:
return (row[0], row[1])
return (0, 0)
def get_all_trim_settings(self, session_id: int) -> dict[str, tuple[int, int]]:
"""Get all trim settings for a session.
Args:
session_id: The session ID.
Returns:
Dict mapping source folder paths to (trim_start, trim_end) tuples.
"""
with self._connect() as conn:
rows = conn.execute(
"""SELECT source_folder, trim_start, trim_end
FROM sequence_trim_settings WHERE session_id = ?""",
(session_id,)
).fetchall()
return {row[0]: (row[1], row[2]) for row in rows}
# --- Business Logic ---
class SymlinkManager:
"""Manages symlink creation and cleanup operations."""
def __init__(self, db: Optional[DatabaseManager] = None) -> None:
"""Initialize the symlink manager.
Args:
db: Optional database manager for tracking operations.
"""
self.db = db
@staticmethod
def get_supported_files(directories: list[Path]) -> list[tuple[Path, str]]:
"""Get all supported image files from multiple directories.
Files are returned sorted by directory order (as provided), then
alphabetically by filename within each directory.
Args:
directories: List of source directories to scan.
Returns:
List of (directory, filename) tuples.
"""
files: list[tuple[Path, str]] = []
for directory in directories:
if not directory.is_dir():
continue
dir_files = []
for item in directory.iterdir():
if item.is_file() and item.suffix.lower() in SUPPORTED_EXTENSIONS:
dir_files.append((directory, item.name))
# Sort files within this directory alphabetically
dir_files.sort(key=lambda x: x[1].lower())
files.extend(dir_files)
return files
@staticmethod
def validate_paths(sources: list[Path], dest: Path) -> None:
"""Validate source and destination paths.
Args:
sources: List of source directories.
dest: Destination directory.
Raises:
SourceNotFoundError: If any source directory doesn't exist.
DestinationError: If destination cannot be created or accessed.
"""
if not sources:
raise SourceNotFoundError("No source directories specified")
for source in sources:
if not source.exists():
raise SourceNotFoundError(f"Source directory not found: {source}")
if not source.is_dir():
raise SourceNotFoundError(f"Source is not a directory: {source}")
try:
dest.mkdir(parents=True, exist_ok=True)
except OSError as e:
raise DestinationError(f"Cannot create destination directory: {e}") from e
if not dest.is_dir():
raise DestinationError(f"Destination is not a directory: {dest}")
@staticmethod
def cleanup_old_links(directory: Path) -> int:
"""Remove existing seq* symlinks from a directory.
Handles both old format (seq_0000) and new format (seq01_0000).
Args:
directory: Directory to clean up.
Returns:
Number of files removed.
Raises:
CleanupError: If cleanup fails.
"""
removed = 0
try:
for item in directory.iterdir():
# Match both old (seq_NNNN) and new (seqNN_NNNN) formats
if item.name.startswith("seq") and item.is_symlink():
item.unlink()
removed += 1
except OSError as e:
raise CleanupError(f"Failed to clean up old links: {e}") from e
return removed
def create_sequence_links(
self,
sources: list[Path],
dest: Path,
files: list[tuple],
trim_settings: Optional[dict[Path, tuple[int, int]]] = None,
) -> tuple[list[LinkResult], Optional[int]]:
"""Create sequenced symlinks from source files to destination.
Args:
sources: List of source directories (for validation).
dest: Destination directory.
files: List of tuples. Can be:
- (source_dir, filename) for CLI mode (uses global sequence)
- (source_dir, filename, folder_idx, file_idx) for GUI mode
trim_settings: Optional dict mapping folder paths to (trim_start, trim_end).
Returns:
Tuple of (list of LinkResult objects, session_id or None).
"""
self.validate_paths(sources, dest)
self.cleanup_old_links(dest)
session_id = None
if self.db:
session_id = self.db.create_session(str(dest))
# Save trim settings if provided
if trim_settings and session_id:
for folder, (trim_start, trim_end) in trim_settings.items():
if trim_start > 0 or trim_end > 0:
self.db.save_trim_settings(
session_id, str(folder), trim_start, trim_end
)
results: list[LinkResult] = []
# Check if we have folder indices (GUI mode) or not (CLI mode)
use_folder_sequences = len(files) > 0 and len(files[0]) >= 4
# For CLI mode without folder indices, calculate them
if not use_folder_sequences:
folder_to_index = {folder: i for i, folder in enumerate(sources)}
folder_file_counts: dict[Path, int] = {}
expanded_files = []
for source_dir, filename in files:
folder_idx = folder_to_index.get(source_dir, 0)
file_idx = folder_file_counts.get(source_dir, 0)
folder_file_counts[source_dir] = file_idx + 1
expanded_files.append((source_dir, filename, folder_idx, file_idx))
files = expanded_files
for i, file_data in enumerate(files):
source_dir, filename, folder_idx, file_idx = file_data
source_path = source_dir / filename
ext = source_path.suffix
link_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
link_path = dest / link_name
# Calculate relative path from destination to source
rel_source = Path(os.path.relpath(source_path.resolve(), dest.resolve()))
try:
link_path.symlink_to(rel_source)
if self.db and session_id:
self.db.record_symlink(
session_id=session_id,
source=str(source_path.resolve()),
link=str(link_path),
filename=filename,
seq=i
)
results.append(LinkResult(
source_path=source_path,
link_path=link_path,
sequence_number=i,
success=True
))
except OSError as e:
results.append(LinkResult(
source_path=source_path,
link_path=link_path,
sequence_number=i,
success=False,
error=str(e)
))
return results, session_id
# --- GUI ---
class SequenceLinkerUI(QWidget):
"""PyQt6 GUI for the Video Montage Linker."""
def __init__(self) -> None:
"""Initialize the UI."""
super().__init__()
self.source_folders: list[Path] = []
self.last_directory: Optional[str] = None
self._last_resumed_dest: Optional[str] = None # Track to avoid double resume
self._folder_trim_settings: dict[Path, tuple[int, int]] = {} # In-memory trim cache
self._folder_file_counts: dict[Path, int] = {} # Total files per folder (before trim)
self._current_session_id: Optional[int] = None # Track session for saving trim
self.db = DatabaseManager()
self.manager = SymlinkManager(self.db)
self._setup_window()
self._create_widgets()
self._create_layout()
self._connect_signals()
self.setAcceptDrops(True)
def _setup_window(self) -> None:
"""Configure the main window properties."""
self.setWindowTitle('Video Montage Linker')
self.setMinimumSize(1000, 700)
def _create_widgets(self) -> None:
"""Create all UI widgets."""
# Source folders group
self.source_group = QGroupBox("Source Folders (drag to reorder, drop folders here)")
self.source_list = QListWidget()
self.source_list.setMaximumHeight(100)
self.source_list.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
self.source_list.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.add_source_btn = QPushButton("Add Folder")
self.remove_source_btn = QPushButton("Remove Folder")
# Destination
self.dst_label = QLabel("Destination Folder:")
self.dst_path = QLineEdit(placeholderText="Select destination folder")
self.dst_btn = QPushButton("Browse")
# File list
self.files_label = QLabel("Sequence Order (Drag to reorder within folder, Del to remove):")
self.file_list = QTreeWidget()
self.file_list.setHeaderLabels(["Sequence Name", "Original Filename", "Source Folder"])
self.file_list.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.file_list.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
self.file_list.setRootIsDecorated(False)
self.file_list.header().setStretchLastSection(True)
self.file_list.header().setSectionResizeMode(0, QHeaderView.ResizeMode.Interactive)
# Action buttons
self.remove_files_btn = QPushButton("Remove Files")
self.refresh_btn = QPushButton("Refresh Files")
self.run_btn = QPushButton("Generate Virtual Sequence")
self.run_btn.setStyleSheet(
"background-color: #3498db; color: white; "
"height: 40px; font-weight: bold;"
)
# Preview tabs
self.preview_tabs = QTabWidget()
# Video preview tab
self.video_tab = QWidget()
self.video_widget = QVideoWidget()
self.video_widget.setMinimumSize(320, 180)
self.media_player = QMediaPlayer()
self.audio_output = QAudioOutput()
self.media_player.setAudioOutput(self.audio_output)
self.media_player.setVideoOutput(self.video_widget)
self.video_combo = QComboBox()
self.video_combo.setPlaceholderText("Select a video to preview")
self.play_btn = QPushButton("Play")
self.stop_btn = QPushButton("Stop")
self.video_slider = QSlider(Qt.Orientation.Horizontal)
self.video_slider.setRange(0, 0)
self.video_time_label = QLabel("00:00 / 00:00")
# Image sequence preview tab
self.image_tab = QWidget()
self.image_scroll = QScrollArea()
self.image_scroll.setWidgetResizable(True)
self.image_scroll.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.image_scroll.viewport().installEventFilter(self)
self.image_scroll.viewport().setCursor(Qt.CursorShape.OpenHandCursor)
self.image_label = QLabel()
self.image_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.image_label.setSizePolicy(QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored)
self.image_label.setScaledContents(False)
self.image_scroll.setWidget(self.image_label)
self.prev_image_btn = QPushButton("◀ Previous")
self.next_image_btn = QPushButton("Next ▶")
self.image_slider = QSlider(Qt.Orientation.Horizontal)
self.image_slider.setRange(0, 0)
self.image_index_label = QLabel("0 / 0")
self.image_name_label = QLabel("")
self.image_name_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.zoom_in_btn = QPushButton("+")
self.zoom_in_btn.setFixedWidth(30)
self.zoom_out_btn = QPushButton("-")
self.zoom_out_btn.setFixedWidth(30)
self.zoom_reset_btn = QPushButton("Fit")
self.zoom_reset_btn.setFixedWidth(40)
self.zoom_label = QLabel("100%")
self.zoom_label.setFixedWidth(45)
self._zoom_level = 1.0
self._current_pixmap: Optional[QPixmap] = None
self._pan_start = None
self._pan_scrollbar_start = None
# Trim slider for sequence trimming
self.trim_slider = TrimSlider()
self.trim_label = QLabel("Frames: All included")
self.trim_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
def _create_layout(self) -> None:
"""Arrange widgets in layouts."""
main_layout = QVBoxLayout()
# Source folders group layout
source_group_layout = QVBoxLayout()
source_btn_layout = QHBoxLayout()
source_btn_layout.addWidget(self.add_source_btn)
source_btn_layout.addWidget(self.remove_source_btn)
source_btn_layout.addStretch()
source_group_layout.addWidget(self.source_list)
source_group_layout.addLayout(source_btn_layout)
self.source_group.setLayout(source_group_layout)
# Destination layout
dst_layout = QHBoxLayout()
dst_layout.addWidget(self.dst_path)
dst_layout.addWidget(self.dst_btn)
# Button layout
btn_layout = QHBoxLayout()
btn_layout.addWidget(self.remove_files_btn)
btn_layout.addWidget(self.refresh_btn)
btn_layout.addStretch()
# Video preview tab layout
video_tab_layout = QVBoxLayout(self.video_tab)
video_tab_layout.addWidget(self.video_combo)
video_tab_layout.addWidget(self.video_widget, 1)
video_controls = QHBoxLayout()
video_controls.addWidget(self.play_btn)
video_controls.addWidget(self.stop_btn)
video_controls.addWidget(self.video_slider, 1)
video_controls.addWidget(self.video_time_label)
video_tab_layout.addLayout(video_controls)
# Image sequence preview tab layout
image_tab_layout = QVBoxLayout(self.image_tab)
# Top bar with name and zoom controls
image_top_bar = QHBoxLayout()
image_top_bar.addWidget(self.image_name_label, 1)
image_top_bar.addWidget(self.zoom_out_btn)
image_top_bar.addWidget(self.zoom_label)
image_top_bar.addWidget(self.zoom_in_btn)
image_top_bar.addWidget(self.zoom_reset_btn)
image_tab_layout.addLayout(image_top_bar)
image_tab_layout.addWidget(self.image_scroll, 1)
image_controls = QHBoxLayout()
image_controls.addWidget(self.prev_image_btn)
image_controls.addWidget(self.image_slider, 1)
image_controls.addWidget(self.next_image_btn)
image_controls.addWidget(self.image_index_label)
image_tab_layout.addLayout(image_controls)
# Trim slider for selected folder
image_tab_layout.addWidget(self.trim_label)
image_tab_layout.addWidget(self.trim_slider)
# Add tabs to tab widget
self.preview_tabs.addTab(self.video_tab, "Video Preview")
self.preview_tabs.addTab(self.image_tab, "Image Sequence")
# Left panel (file list)
left_panel = QWidget()
left_layout = QVBoxLayout(left_panel)
left_layout.setContentsMargins(0, 0, 0, 0)
left_layout.addWidget(self.files_label)
left_layout.addWidget(self.file_list)
left_layout.addLayout(btn_layout)
# Splitter for file list and preview tabs
self.splitter = QSplitter(Qt.Orientation.Horizontal)
self.splitter.addWidget(left_panel)
self.splitter.addWidget(self.preview_tabs)
self.splitter.setSizes([400, 400])
# Assemble main layout
main_layout.addWidget(self.source_group)
main_layout.addWidget(self.dst_label)
main_layout.addLayout(dst_layout)
main_layout.addWidget(self.splitter, 1)
main_layout.addWidget(self.run_btn)
self.setLayout(main_layout)
def _connect_signals(self) -> None:
"""Connect widget signals to slots."""
self.add_source_btn.clicked.connect(self._add_source_folder)
self.remove_source_btn.clicked.connect(self._remove_source_folder)
self.dst_btn.clicked.connect(self._browse_destination)
self.dst_path.editingFinished.connect(self._on_destination_changed)
self.remove_files_btn.clicked.connect(self._remove_selected_files)
self.refresh_btn.clicked.connect(self._refresh_files)
self.run_btn.clicked.connect(self._process_links)
# Connect reorder signals
self.source_list.model().rowsMoved.connect(self._on_folders_reordered)
self.file_list.model().rowsMoved.connect(self._recalculate_sequence_names)
# Connect folder selection to update video list
self.source_list.currentItemChanged.connect(self._on_folder_selected)
# Video player signals
self.video_combo.currentIndexChanged.connect(self._on_video_selected)
self.play_btn.clicked.connect(self._toggle_play)
self.stop_btn.clicked.connect(self._stop_video)
self.media_player.positionChanged.connect(self._on_position_changed)
self.media_player.durationChanged.connect(self._on_duration_changed)
self.video_slider.sliderMoved.connect(self._seek_video)
# Image sequence signals
self.file_list.currentItemChanged.connect(self._on_file_selected)
self.prev_image_btn.clicked.connect(self._prev_image)
self.next_image_btn.clicked.connect(self._next_image)
self.image_slider.valueChanged.connect(self._on_image_slider_changed)
self.zoom_in_btn.clicked.connect(self._zoom_in)
self.zoom_out_btn.clicked.connect(self._zoom_out)
self.zoom_reset_btn.clicked.connect(self._zoom_reset)
# Trim slider signals
self.trim_slider.trimChanged.connect(self._on_trim_changed)
def _add_source_folder(self, folder_path: Optional[str] = None) -> None:
"""Add a source folder via file dialog or direct path.
Args:
folder_path: Optional path to add directly (for drag-drop).
"""
if folder_path:
path = folder_path
else:
start_dir = self.last_directory or ""
path = QFileDialog.getExistingDirectory(
self, "Select Source Folder", start_dir
)
if path:
folder = Path(path)
if folder.is_dir() and folder not in self.source_folders:
self.source_folders.append(folder)
self.source_list.addItem(str(folder))
self.last_directory = str(folder.parent)
self._refresh_files()
# Auto-select the newly added folder to show its videos
self.source_list.setCurrentRow(self.source_list.count() - 1)
def _remove_source_folder(self) -> None:
"""Remove selected source folder(s)."""
selected = self.source_list.selectedItems()
if not selected:
return
# Remove in reverse order to maintain correct indices
rows = sorted([self.source_list.row(item) for item in selected], reverse=True)
for row in rows:
self.source_list.takeItem(row)
del self.source_folders[row]
self._refresh_files()
def _remove_selected_files(self) -> None:
"""Remove selected files from the file list."""
selected = self.file_list.selectedItems()
if not selected:
return
# Remove in reverse order to maintain correct indices
rows = sorted([self.file_list.indexOfTopLevelItem(item) for item in selected], reverse=True)
for row in rows:
self.file_list.takeTopLevelItem(row)
def _browse_destination(self) -> None:
"""Select destination folder via file dialog."""
start_dir = self.last_directory or ""
path = QFileDialog.getExistingDirectory(
self, "Select Destination Folder", start_dir
)
if path:
self.dst_path.setText(path)
self.last_directory = str(Path(path).parent)
self._try_resume_session(path)
def _on_destination_changed(self) -> None:
"""Handle destination path text field changes."""
path = self.dst_path.text().strip()
if path and Path(path).is_dir():
resolved = str(Path(path).resolve())
# Only try resume if this is a new destination
if resolved != self._last_resumed_dest:
self._try_resume_session(path)
def _try_resume_session(self, dest_path: str) -> bool:
"""Try to resume a previous session for the given destination.
Checks if a session exists for this destination, extracts source folders
from recorded symlinks, and populates the UI with files that still exist.
Also restores trim settings.
Args:
dest_path: Path to the destination folder.
Returns:
True if a session was resumed, False otherwise.
"""
dest = Path(dest_path).resolve()
dest_str = str(dest)
# Track that we've checked this destination
self._last_resumed_dest = dest_str
sessions = self.db.get_sessions_by_destination(dest_str)
if not sessions:
return False
# Get the most recent session
latest_session = sessions[0]
symlinks = self.db.get_symlinks_by_session(latest_session.id)
if not symlinks:
return False
# Load trim settings from database
db_trim_settings = self.db.get_all_trim_settings(latest_session.id)
# Parse folder and file indices from link names
# New format: seqNN_NNNN.ext, Old format: seq_NNNN.ext
new_pattern = re.compile(r'seq(\d+)_(\d+)')
old_pattern = re.compile(r'seq_(\d+)')
# Collect folder info: {folder_path: (folder_idx, [(file_idx, filename)])}
folder_data: dict[str, tuple[int, list[tuple[int, str]]]] = {}
missing_count = 0
for link in symlinks:
source_path = Path(link.source_path)
if not source_path.exists():
missing_count += 1
continue
folder = str(source_path.parent)
link_name = Path(link.link_path).stem
# Try new format first
match = new_pattern.match(link_name)
if match:
folder_idx = int(match.group(1)) - 1 # Convert to 0-based
file_idx = int(match.group(2))
else:
# Try old format (single sequence)
match = old_pattern.match(link_name)
if match:
folder_idx = 0
file_idx = int(match.group(1))
else:
# Unknown format, use sequence_number from db
folder_idx = 0
file_idx = link.sequence_number
if folder not in folder_data:
folder_data[folder] = (folder_idx, [])
folder_data[folder][1].append((file_idx, link.original_filename))
if not folder_data:
return False
# Sort folders by their index, then sort files within each folder
sorted_folders = sorted(folder_data.items(), key=lambda x: x[1][0])
# Clear and populate source folders
self.source_folders.clear()
self.source_list.clear()
self._folder_trim_settings.clear()
for folder, (folder_idx, file_list) in sorted_folders:
folder_path = Path(folder)
if folder_path.exists():
self.source_folders.append(folder_path)
self.source_list.addItem(folder)
# Restore trim settings for this folder
if folder in db_trim_settings:
self._folder_trim_settings[folder_path] = db_trim_settings[folder]
# Store session ID
self._current_session_id = latest_session.id
# Call _refresh_files to properly populate file list with trim settings applied
self._refresh_files()
# Notify user
total_files = self.file_list.topLevelItemCount()
trim_count = sum(1 for ts in self._folder_trim_settings.values() if ts[0] > 0 or ts[1] > 0)
msg = f"Resumed session from {latest_session.created_at.strftime('%Y-%m-%d %H:%M')}.\n"
msg += f"Loaded {total_files} files from {len(self.source_folders)} folder(s)."
if trim_count > 0:
msg += f"\nRestored trim settings for {trim_count} folder(s)."
if missing_count > 0:
msg += f"\n{missing_count} file(s) no longer exist and were skipped."
QMessageBox.information(self, "Session Resumed", msg)
return True
def keyPressEvent(self, event) -> None:
"""Handle key press events."""
in_image_tab = self.preview_tabs.currentWidget() == self.image_tab
if event.key() == Qt.Key.Key_Delete:
if self.file_list.hasFocus():
self._remove_selected_files()
elif self.source_list.hasFocus():
self._remove_source_folder()
elif in_image_tab:
# Delete current image from sequence
self._delete_current_image()
elif event.key() == Qt.Key.Key_Left:
if in_image_tab:
self._prev_image()
elif event.key() == Qt.Key.Key_Right:
if in_image_tab:
self._next_image()
elif event.key() == Qt.Key.Key_Plus or event.key() == Qt.Key.Key_Equal:
if in_image_tab:
self._zoom_in()
elif event.key() == Qt.Key.Key_Minus:
if in_image_tab:
self._zoom_out()
elif event.key() == Qt.Key.Key_0:
if in_image_tab:
self._zoom_reset()
else:
super().keyPressEvent(event)
def closeEvent(self, event) -> None:
"""Clean up media player when window closes."""
self.media_player.stop()
super().closeEvent(event)
def wheelEvent(self, event) -> None:
"""Handle mouse wheel for zoom in image tab."""
if self.preview_tabs.currentWidget() == self.image_tab:
# Check if mouse is over the image scroll area
if self.image_scroll.underMouse():
delta = event.angleDelta().y()
if delta > 0:
self._zoom_in()
elif delta < 0:
self._zoom_out()
event.accept()
return
super().wheelEvent(event)
def eventFilter(self, obj, event) -> bool:
"""Handle mouse events for panning the image."""
if obj == self.image_scroll.viewport():
if event.type() == QEvent.Type.MouseButtonPress:
if event.button() == Qt.MouseButton.LeftButton:
self._pan_start = event.pos()
self._pan_scrollbar_start = QPoint(
self.image_scroll.horizontalScrollBar().value(),
self.image_scroll.verticalScrollBar().value()
)
self.image_scroll.viewport().setCursor(Qt.CursorShape.ClosedHandCursor)
return True
elif event.type() == QEvent.Type.MouseMove:
if self._pan_start is not None:
delta = event.pos() - self._pan_start
self.image_scroll.horizontalScrollBar().setValue(
self._pan_scrollbar_start.x() - delta.x()
)
self.image_scroll.verticalScrollBar().setValue(
self._pan_scrollbar_start.y() - delta.y()
)
return True
elif event.type() == QEvent.Type.MouseButtonRelease:
if event.button() == Qt.MouseButton.LeftButton and self._pan_start is not None:
self._pan_start = None
self._pan_scrollbar_start = None
self.image_scroll.viewport().setCursor(Qt.CursorShape.OpenHandCursor)
return True
return super().eventFilter(obj, event)
def dragEnterEvent(self, event: QDragEnterEvent) -> None:
"""Accept drag events with URLs (folders)."""
if event.mimeData().hasUrls():
event.acceptProposedAction()
def dropEvent(self, event: QDropEvent) -> None:
"""Handle dropped folders."""
for url in event.mimeData().urls():
path = url.toLocalFile()
if path and Path(path).is_dir():
self._add_source_folder(path)
def _on_folders_reordered(self) -> None:
"""Handle folder list reordering."""
# Rebuild source_folders from current list order
self.source_folders.clear()
for i in range(self.source_list.count()):
item = self.source_list.item(i)
self.source_folders.append(Path(item.text()))
self._refresh_files()
def _refresh_files(self, select_position: str = 'first') -> None:
"""Refresh the file list from all source folders, applying trim settings.
Args:
select_position: Which item to select after refresh.
'first' - select first item (default)
'last' - select last item
'none' - don't change selection
"""
self.file_list.clear()
if not self.source_folders:
self._folder_file_counts.clear()
return
# Build folder index map
folder_to_index = {folder: i for i, folder in enumerate(self.source_folders)}
# Get all files from all folders
all_files = self.manager.get_supported_files(self.source_folders)
# Group files by folder first to get total counts
files_by_folder: dict[Path, list[str]] = {}
for source_dir, filename in all_files:
if source_dir not in files_by_folder:
files_by_folder[source_dir] = []
files_by_folder[source_dir].append(filename)
# Store total file counts per folder (before trimming)
self._folder_file_counts = {folder: len(files) for folder, files in files_by_folder.items()}
# Apply trim settings and build file list
folder_file_counts: dict[Path, int] = {} # For sequence numbering after trim
for folder in self.source_folders:
if folder not in files_by_folder:
continue
folder_files = files_by_folder[folder]
total_in_folder = len(folder_files)
# Get trim settings for this folder
trim_start, trim_end = self._folder_trim_settings.get(folder, (0, 0))
# Clamp trim values to valid range
trim_start = min(trim_start, max(0, total_in_folder - 1))
trim_end = min(trim_end, max(0, total_in_folder - 1 - trim_start))
# Apply trim - slice the file list
end_idx = total_in_folder - trim_end
trimmed_files = folder_files[trim_start:end_idx]
folder_idx = folder_to_index.get(folder, 0)
for filename in trimmed_files:
file_idx = folder_file_counts.get(folder, 0)
folder_file_counts[folder] = file_idx + 1
# Generate sequence name preview
ext = Path(filename).suffix
seq_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
item = QTreeWidgetItem([seq_name, filename, str(folder)])
# Store (source_dir, filename, folder_idx, file_idx) for symlink creation
item.setData(0, Qt.ItemDataRole.UserRole, (folder, filename, folder_idx, file_idx))
self.file_list.addTopLevelItem(item)
# Update image slider and select appropriate item
total = self.file_list.topLevelItemCount()
self.image_slider.setRange(0, max(0, total - 1))
if total > 0 and select_position != 'none':
if select_position == 'last':
self.file_list.setCurrentItem(self.file_list.topLevelItem(total - 1))
else: # 'first' or default
self.file_list.setCurrentItem(self.file_list.topLevelItem(0))
# Update trim slider for currently selected folder
self._update_trim_slider_for_selected_folder()
def _get_files_in_order(self) -> list[tuple[Path, str, int, int]]:
"""Get files in the current list order with sequence info.
Returns:
List of (source_dir, filename, folder_idx, file_idx) tuples.
"""
files = []
for i in range(self.file_list.topLevelItemCount()):
item = self.file_list.topLevelItem(i)
data = item.data(0, Qt.ItemDataRole.UserRole)
if data:
files.append(data)
return files
def _recalculate_sequence_names(self) -> None:
"""Recalculate sequence names after file reordering."""
if not self.source_folders:
return
folder_to_index = {folder: i for i, folder in enumerate(self.source_folders)}
folder_file_counts: dict[Path, int] = {}
for i in range(self.file_list.topLevelItemCount()):
item = self.file_list.topLevelItem(i)
data = item.data(0, Qt.ItemDataRole.UserRole)
if data:
source_dir = data[0]
filename = data[1]
folder_idx = folder_to_index.get(source_dir, 0)
file_idx = folder_file_counts.get(source_dir, 0)
folder_file_counts[source_dir] = file_idx + 1
# Update sequence name
ext = Path(filename).suffix
seq_name = f"seq{folder_idx + 1:02d}_{file_idx:04d}{ext}"
item.setText(0, seq_name)
# Update stored data
item.setData(0, Qt.ItemDataRole.UserRole, (source_dir, filename, folder_idx, file_idx))
# --- Video Preview Methods ---
def _get_videos_in_folder(self, folder: Path) -> list[Path]:
"""Get all video files in the parent folder of the source.
The video representing a sequence is typically one level above
the folder containing the images.
Args:
folder: Source folder path (videos are in its parent).
Returns:
List of video file paths, sorted alphabetically.
"""
videos = []
parent = folder.parent
if parent.is_dir():
for item in parent.iterdir():
if item.is_file() and item.suffix.lower() in VIDEO_EXTENSIONS:
videos.append(item)
return sorted(videos, key=lambda p: p.name.lower())
def _on_folder_selected(self, current, previous) -> None:
"""Handle folder selection change - update video list and trim slider."""
self._stop_video()
self.video_combo.clear()
if current is None:
self.trim_slider.setRange(0)
self.trim_slider.setEnabled(False)
self.trim_label.setText("Frames: No folder selected")
return
folder = Path(current.text())
# Update trim slider for selected folder
self._update_trim_slider_for_selected_folder()
# Update video list
videos = self._get_videos_in_folder(folder)
if not videos:
self.video_combo.addItem("No videos found")
self.video_combo.setEnabled(False)
return
self.video_combo.setEnabled(True)
for video in videos:
self.video_combo.addItem(video.name, video)
# Auto-select first video
self.video_combo.setCurrentIndex(0)
def _update_trim_slider_for_selected_folder(self) -> None:
"""Update the trim slider to reflect the currently selected folder."""
current_item = self.source_list.currentItem()
if current_item is None:
self.trim_slider.setRange(0)
self.trim_slider.setEnabled(False)
self.trim_label.setText("Frames: No folder selected")
return
folder = Path(current_item.text())
total = self._folder_file_counts.get(folder, 0)
if total == 0:
self.trim_slider.setRange(0)
self.trim_slider.setEnabled(False)
self.trim_label.setText("Frames: No images in folder")
return
# Get current trim settings
trim_start, trim_end = self._folder_trim_settings.get(folder, (0, 0))
# Update trim slider
self.trim_slider.setEnabled(True)
self.trim_slider.setRange(total)
self.trim_slider.setTrimStart(trim_start)
self.trim_slider.setTrimEnd(trim_end)
# Update label
self._update_trim_label(folder, total, trim_start, trim_end)
def _update_trim_label(self, folder: Path, total: int, trim_start: int, trim_end: int) -> None:
"""Update the trim label to show current trim range."""
included_start = trim_start + 1 # 1-based for display
included_end = total - trim_end
included_count = included_end - trim_start
if trim_start == 0 and trim_end == 0:
self.trim_label.setText(f"Frames: All {total} included")
elif included_count <= 0:
self.trim_label.setText(f"Frames: None included (all {total} trimmed)")
else:
self.trim_label.setText(f"Frames {included_start}-{included_end} of {total} ({included_count} included)")
def _on_trim_changed(self, trim_start: int, trim_end: int, handle: str) -> None:
"""Handle trim slider value changes.
Args:
trim_start: Number of frames trimmed from start.
trim_end: Number of frames trimmed from end.
handle: Which handle was dragged ('left' or 'right').
"""
current_item = self.source_list.currentItem()
if current_item is None:
return
folder = Path(current_item.text())
total = self._folder_file_counts.get(folder, 0)
# Store trim settings
self._folder_trim_settings[folder] = (trim_start, trim_end)
# Update label
self._update_trim_label(folder, total, trim_start, trim_end)
# Refresh file list to apply new trim settings (don't auto-select)
self._refresh_files(select_position='none')
# Select first or last image OF THE CURRENT FOLDER based on which handle was dragged
# Left handle (trim start) -> show first visible frame of this folder
# Right handle (trim end) -> show last visible frame of this folder
self._select_folder_boundary(folder, 'first' if handle == 'left' else 'last')
def _select_folder_boundary(self, folder: Path, position: str) -> None:
"""Select the first or last file of a specific folder in the file list.
Args:
folder: The folder whose files to search.
position: 'first' or 'last'.
"""
folder_str = str(folder)
matching_indices = []
for i in range(self.file_list.topLevelItemCount()):
item = self.file_list.topLevelItem(i)
data = item.data(0, Qt.ItemDataRole.UserRole)
if data and str(data[0]) == folder_str:
matching_indices.append(i)
if not matching_indices:
return
if position == 'last':
select_idx = matching_indices[-1]
else:
select_idx = matching_indices[0]
item = self.file_list.topLevelItem(select_idx)
self.file_list.setCurrentItem(item)
self.image_slider.setValue(select_idx)
self._show_image_at_index(select_idx)
def _on_video_selected(self, index: int) -> None:
"""Handle video selection from combo box."""
self._stop_video()
if index < 0:
return
video_path = self.video_combo.currentData()
if video_path and isinstance(video_path, Path) and video_path.exists():
self.media_player.setSource(QUrl.fromLocalFile(str(video_path)))
def _toggle_play(self) -> None:
"""Toggle play/pause state."""
if self.media_player.playbackState() == QMediaPlayer.PlaybackState.PlayingState:
self.media_player.pause()
self.play_btn.setText("Play")
else:
self.media_player.play()
self.play_btn.setText("Pause")
def _stop_video(self) -> None:
"""Stop video playback."""
self.media_player.stop()
self.play_btn.setText("Play")
self.video_slider.setValue(0)
self.video_time_label.setText("00:00 / 00:00")
def _on_position_changed(self, position: int) -> None:
"""Update slider and time label when playback position changes."""
self.video_slider.setValue(position)
self._update_time_label(position, self.media_player.duration())
def _on_duration_changed(self, duration: int) -> None:
"""Update slider range when video duration is known."""
self.video_slider.setRange(0, duration)
self._update_time_label(self.media_player.position(), duration)
def _seek_video(self, position: int) -> None:
"""Seek to a position in the video."""
self.media_player.setPosition(position)
def _update_time_label(self, position: int, duration: int) -> None:
"""Update the time label with current position and duration."""
def format_time(ms: int) -> str:
seconds = ms // 1000
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes:02d}:{seconds:02d}"
self.video_time_label.setText(f"{format_time(position)} / {format_time(duration)}")
# --- Image Sequence Preview Methods ---
def _on_file_selected(self, current, previous) -> None:
"""Handle file selection in the list - update image preview."""
if current is None:
return
# Update slider range based on total files
total = self.file_list.topLevelItemCount()
current_index = self.file_list.indexOfTopLevelItem(current)
self.image_slider.setRange(0, max(0, total - 1))
self.image_slider.setValue(current_index)
self._show_image_at_index(current_index)
def _show_image_at_index(self, index: int) -> None:
"""Display the image at the given index in the file list."""
if index < 0 or index >= self.file_list.topLevelItemCount():
self._current_pixmap = None
return
item = self.file_list.topLevelItem(index)
if item is None:
self._current_pixmap = None
return
data = item.data(0, Qt.ItemDataRole.UserRole)
if not data:
self._current_pixmap = None
return
source_dir, filename = data[0], data[1]
image_path = source_dir / filename
if not image_path.exists():
self.image_label.setText(f"Image not found:\n{image_path}")
self.image_name_label.setText("")
self._current_pixmap = None
return
# Load and display image
pixmap = QPixmap(str(image_path))
if pixmap.isNull():
self.image_label.setText(f"Cannot load image:\n{image_path}")
self.image_name_label.setText("")
self._current_pixmap = None
return
# Store pixmap for zooming
self._current_pixmap = pixmap
self._apply_zoom()
# Update labels
total = self.file_list.topLevelItemCount()
self.image_index_label.setText(f"{index + 1} / {total}")
seq_name = item.text(0)
self.image_name_label.setText(f"{seq_name} ({filename})")
# Select the item in the file list
self.file_list.setCurrentItem(item)
def _apply_zoom(self) -> None:
"""Apply current zoom level to the image."""
if self._current_pixmap is None:
return
if self._zoom_level == 1.0:
# Fit to scroll area
scaled = self._current_pixmap.scaled(
self.image_scroll.size() * 0.95,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation
)
else:
# Apply zoom level
new_size = self._current_pixmap.size() * self._zoom_level
scaled = self._current_pixmap.scaled(
new_size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation
)
self.image_label.setPixmap(scaled)
self.zoom_label.setText(f"{int(self._zoom_level * 100)}%")
def _zoom_in(self) -> None:
"""Zoom in on the image."""
if self._zoom_level < 5.0:
self._zoom_level = min(5.0, self._zoom_level * 1.25)
self._apply_zoom()
def _zoom_out(self) -> None:
"""Zoom out on the image."""
if self._zoom_level > 0.1:
self._zoom_level = max(0.1, self._zoom_level / 1.25)
self._apply_zoom()
def _zoom_reset(self) -> None:
"""Reset zoom to fit the scroll area."""
self._zoom_level = 1.0
self._apply_zoom()
def _delete_current_image(self) -> None:
"""Delete the currently displayed image from the sequence."""
current_index = self.image_slider.value()
total = self.file_list.topLevelItemCount()
if total == 0 or current_index < 0 or current_index >= total:
return
# Remove from file list
self.file_list.takeTopLevelItem(current_index)
self._recalculate_sequence_names()
# Update slider range
new_total = self.file_list.topLevelItemCount()
self.image_slider.setRange(0, max(0, new_total - 1))
if new_total == 0:
self.image_label.clear()
self.image_name_label.setText("")
self.image_index_label.setText("0 / 0")
self._current_pixmap = None
else:
# Show next image (or previous if we deleted the last one)
new_index = min(current_index, new_total - 1)
self.image_slider.setValue(new_index)
self._show_image_at_index(new_index)
def _prev_image(self) -> None:
"""Show the previous image in the sequence."""
current = self.image_slider.value()
if current > 0:
self.image_slider.setValue(current - 1)
def _next_image(self) -> None:
"""Show the next image in the sequence."""
current = self.image_slider.value()
if current < self.image_slider.maximum():
self.image_slider.setValue(current + 1)
def _on_image_slider_changed(self, value: int) -> None:
"""Handle image slider movement."""
self._show_image_at_index(value)
def _process_links(self) -> None:
"""Create symlinks based on current configuration."""
dst = self.dst_path.text()
if not self.source_folders:
QMessageBox.warning(self, "Error", "Add at least one source folder!")
return
if not dst:
QMessageBox.warning(self, "Error", "Select a destination folder!")
return
files = self._get_files_in_order()
if not files:
QMessageBox.warning(self, "Error", "No files to process!")
return
try:
results, session_id = self.manager.create_sequence_links(
sources=self.source_folders,
dest=Path(dst),
files=files,
trim_settings=self._folder_trim_settings
)
# Store session ID for potential future use
self._current_session_id = session_id
successful = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
if failed > 0:
QMessageBox.warning(
self, "Partial Success",
f"Linked {successful} files, {failed} failed.\n"
f"Destination: {dst}"
)
else:
QMessageBox.information(
self, "Success",
f"Linked {successful} files to {dst}"
)
except SymlinkError as e:
QMessageBox.critical(self, "Error", str(e))
except Exception as e:
QMessageBox.critical(self, "Unexpected Error", str(e))
# --- CLI ---
def create_parser() -> argparse.ArgumentParser:
"""Create the argument parser for CLI mode.
Returns:
Configured ArgumentParser instance.
"""
parser = argparse.ArgumentParser(
prog='symlink',
description='Video Montage Linker - Create sequenced symlinks for image files.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s Launch GUI
%(prog)s --gui Launch GUI
%(prog)s --src /path/to/images --dst /path/to/dest
Create symlinks from CLI
%(prog)s --src /folder1 --src /folder2 --dst /path/to/dest
Merge multiple source folders
%(prog)s --list List tracked symlink sessions
%(prog)s --clean /path/to/dest Remove symlinks and session for destination
"""
)
parser.add_argument(
'--gui',
action='store_true',
help='Launch the graphical interface'
)
parser.add_argument(
'--src',
action='append',
metavar='PATH',
help='Source folder(s) containing images (can be used multiple times)'
)
parser.add_argument(
'--dst',
metavar='PATH',
help='Destination folder for symlinks'
)
parser.add_argument(
'--list',
action='store_true',
help='List all tracked symlink sessions'
)
parser.add_argument(
'--clean',
metavar='PATH',
help='Clean up symlinks and remove session for the specified destination'
)
return parser
def run_cli(args: argparse.Namespace) -> int:
"""Execute CLI commands.
Args:
args: Parsed command line arguments.
Returns:
Exit code (0 for success, non-zero for errors).
"""
db = DatabaseManager()
# List sessions
if args.list:
sessions = db.get_sessions()
if not sessions:
print("No symlink sessions found.")
return 0
print(f"{'ID':<6} {'Created':<20} {'Links':<8} Destination")
print("-" * 80)
for session in sessions:
created = session.created_at.strftime("%Y-%m-%d %H:%M:%S")
print(f"{session.id:<6} {created:<20} {session.link_count:<8} {session.destination}")
return 0
# Clean up destination
if args.clean:
dest = Path(args.clean).resolve()
# Remove symlinks from filesystem
if dest.exists():
try:
removed = SymlinkManager.cleanup_old_links(dest)
print(f"Removed {removed} symlinks from {dest}")
except CleanupError as e:
print(f"Error cleaning up files: {e}", file=sys.stderr)
return 1
# Remove from database
sessions = db.get_sessions_by_destination(str(dest))
for session in sessions:
db.delete_session(session.id)
print(f"Removed session {session.id} from database")
if not sessions:
print("No sessions found for this destination in database.")
return 0
# Create symlinks
if args.src and args.dst:
sources = [Path(s).resolve() for s in args.src]
dest = Path(args.dst).resolve()
manager = SymlinkManager(db)
try:
files = manager.get_supported_files(sources)
if not files:
print("No supported image files found in source folders.")
return 1
print(f"Found {len(files)} files in {len(sources)} source folder(s)")
results, _ = manager.create_sequence_links(
sources=sources,
dest=dest,
files=files
)
successful = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
print(f"Created {successful} symlinks in {dest}")
if failed > 0:
print(f"Warning: {failed} operations failed", file=sys.stderr)
for r in results:
if not r.success:
print(f" - {r.source_path.name}: {r.error}", file=sys.stderr)
return 1
return 0
except SymlinkError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
# If src without dst or dst without src
if args.src or args.dst:
print("Error: Both --src and --dst are required for creating symlinks.",
file=sys.stderr)
return 1
# No CLI args, show help
create_parser().print_help()
return 0
# --- Entry Point ---
def main() -> int:
"""Main entry point for the application.
Returns:
Exit code (0 for success, non-zero for errors).
"""
parser = create_parser()
args = parser.parse_args()
# Determine if we should launch GUI
# GUI is launched if: --gui flag, OR no arguments at all
launch_gui = args.gui or (
not args.src and
not args.dst and
not args.list and
not args.clean
)
if launch_gui:
app = QApplication(sys.argv)
window = SequenceLinkerUI()
window.show()
return app.exec()
return run_cli(args)
if __name__ == '__main__':
sys.exit(main())