Files
Ethanfel 39f873bec2 fix: server bug fixes from review
- DB: add threading.Lock on all write methods and multi-step reads
- export.py: check audio extraction return code, raise on failure
- routes/export: counter race condition fix with _counter_lock
- routes/export: delete validation accepts EXPORT_DIR_suffix siblings
- routes/export: evict old finished jobs to prevent unbounded growth
- client plan: fix 10 bugs (mpv IPC, encodePath, input_path sep, etc.)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 19:53:38 +02:00

128 lines
4.4 KiB
Python

import os
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from .ffmpeg import build_ffmpeg_command, build_audio_extract_command
from .paths import _log
class ExportRunner:
"""Run ffmpeg export jobs in a background thread pool.
Callbacks:
on_clip_done(path: str)
on_all_done()
on_error(msg: str)
"""
def __init__(
self,
input_path: str,
jobs: list[tuple[float, str, str | None, float]],
short_side: int | None = None,
image_sequence: bool = False,
max_workers: int | None = None,
encoder: str = "libx264",
on_clip_done: Callable[[str], None] | None = None,
on_all_done: Callable[[], None] | None = None,
on_error: Callable[[str], None] | None = None,
):
self._input = input_path
self._jobs = jobs
self._short_side = short_side
self._image_sequence = image_sequence
self._max_workers = max_workers
self._encoder = encoder
self._on_clip_done = on_clip_done
self._on_all_done = on_all_done
self._on_error = on_error
self._cancel = False
self._procs: list[subprocess.Popen] = []
self._procs_lock = threading.Lock()
self._thread: threading.Thread | None = None
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def cancel(self):
self._cancel = True
with self._procs_lock:
for proc in self._procs:
try:
proc.kill()
except OSError:
pass
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive()
def _run_one(self, start: float, output: str,
portrait_ratio: str | None, crop_center: float) -> str:
if self._cancel:
raise RuntimeError("cancelled")
if self._image_sequence:
os.makedirs(output, exist_ok=True)
cmd = build_ffmpeg_command(
self._input, start, output,
short_side=self._short_side,
portrait_ratio=portrait_ratio,
crop_center=crop_center,
image_sequence=self._image_sequence,
encoder=self._encoder,
)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
with self._procs_lock:
self._procs.append(proc)
try:
_, stderr = proc.communicate(timeout=120)
except subprocess.TimeoutExpired:
proc.kill()
raise RuntimeError("ffmpeg timed out")
finally:
with self._procs_lock:
self._procs.remove(proc)
if self._cancel:
raise RuntimeError("cancelled")
if proc.returncode != 0:
msg = stderr.decode(errors='replace')[-500:] if stderr else "ffmpeg failed"
raise RuntimeError(msg)
if self._image_sequence:
audio_cmd = build_audio_extract_command(self._input, start, output)
audio_result = subprocess.run(audio_cmd, capture_output=True, text=True, timeout=60)
if audio_result.returncode != 0:
msg = (audio_result.stderr or "audio extraction failed")[-500:]
raise RuntimeError(msg)
return output
def _run(self):
cap = self._max_workers or (os.cpu_count() or 2)
workers = min(len(self._jobs), cap)
try:
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(self._run_one, s, o, pr, cc): o
for s, o, pr, cc in self._jobs
}
for fut in as_completed(futures):
if self._cancel:
break
try:
path = fut.result()
if self._on_clip_done:
self._on_clip_done(path)
except Exception as e:
if "cancelled" not in str(e) and self._on_error:
self._on_error(str(e))
return
except Exception as e:
if self._on_error:
self._on_error(str(e))
return
if self._cancel:
return
if self._on_all_done:
self._on_all_done()