From 39f873bec2e3eeca9e89c95fdbf5114e9e479627 Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Thu, 16 Apr 2026 19:53:38 +0200 Subject: [PATCH] fix: server bug fixes from review - DB: add threading.Lock on all write methods and multi-step reads - export.py: check audio extraction return code, raise on failure - routes/export: counter race condition fix with _counter_lock - routes/export: delete validation accepts EXPORT_DIR_suffix siblings - routes/export: evict old finished jobs to prevent unbounded growth - client plan: fix 10 bugs (mpv IPC, encodePath, input_path sep, etc.) Co-Authored-By: Claude Opus 4.6 --- core/db.py | 89 ++++++------ core/export.py | 5 +- .../plans/2026-04-16-client-implementation.md | 130 +++++++++++------- server/routes/export.py | 50 +++++-- 4 files changed, 166 insertions(+), 108 deletions(-) diff --git a/core/db.py b/core/db.py index f741934..e0ba4d0 100644 --- a/core/db.py +++ b/core/db.py @@ -1,4 +1,5 @@ import sqlite3 +import threading from datetime import datetime, timezone from pathlib import Path @@ -12,6 +13,7 @@ class ProcessedDB: if db_path is None: db_path = str(Path.home() / ".8cut.db") self._path = db_path + self._lock = threading.Lock() try: self._con = sqlite3.connect(db_path, check_same_thread=False) self._migrate() @@ -86,18 +88,19 @@ class ProcessedDB: profile: str = "default") -> None: if not self._enabled: return - self._con.execute( - "INSERT INTO processed" - " (filename, start_time, output_path, label, category," - " short_side, portrait_ratio, crop_center, format," - " clip_count, spread, profile, processed_at)" - " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - (filename, start_time, output_path, label, category, - short_side, portrait_ratio, crop_center, fmt, - clip_count, spread, profile, - datetime.now(timezone.utc).isoformat()), - ) - self._con.commit() + with self._lock: + self._con.execute( + "INSERT INTO processed" + " (filename, start_time, output_path, label, category," + " short_side, portrait_ratio, crop_center, format," + " clip_count, spread, profile, processed_at)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (filename, start_time, output_path, label, category, + short_side, portrait_ratio, crop_center, fmt, + clip_count, spread, profile, + datetime.now(timezone.utc).isoformat()), + ) + self._con.commit() def get_labels(self) -> list[str]: """Return distinct non-empty labels ordered by most recently used.""" @@ -134,8 +137,9 @@ class ProcessedDB: def delete_by_output_path(self, output_path: str) -> None: if not self._enabled: return - self._con.execute("DELETE FROM processed WHERE output_path = ?", (output_path,)) - self._con.commit() + with self._lock: + self._con.execute("DELETE FROM processed WHERE output_path = ?", (output_path,)) + self._con.commit() def get_group(self, output_path: str) -> list[str]: """Return all output_paths sharing the same (filename, start_time) as *output_path*.""" @@ -159,23 +163,24 @@ class ProcessedDB: Returns list of deleted output_paths.""" if not self._enabled: return [] - row = self._con.execute( - "SELECT filename, start_time FROM processed WHERE output_path = ?", - (output_path,), - ).fetchone() - if not row: - return [] - filename, start_time = row - paths = [r[0] for r in self._con.execute( - "SELECT output_path FROM processed WHERE filename = ? AND start_time = ?", - (filename, start_time), - ).fetchall()] - self._con.execute( - "DELETE FROM processed WHERE filename = ? AND start_time = ?", - (filename, start_time), - ) - self._con.commit() - return paths + with self._lock: + row = self._con.execute( + "SELECT filename, start_time FROM processed WHERE output_path = ?", + (output_path,), + ).fetchone() + if not row: + return [] + filename, start_time = row + paths = [r[0] for r in self._con.execute( + "SELECT output_path FROM processed WHERE filename = ? AND start_time = ?", + (filename, start_time), + ).fetchall()] + self._con.execute( + "DELETE FROM processed WHERE filename = ? AND start_time = ?", + (filename, start_time), + ) + self._con.commit() + return paths def _get_markers_for(self, match: str, profile: str = "default") -> list[tuple[float, int, str]]: rows = self._con.execute( @@ -211,20 +216,22 @@ class ProcessedDB: def hide_file(self, filename: str, profile: str = "default") -> None: if not self._enabled: return - self._con.execute( - "INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)", - (filename, profile), - ) - self._con.commit() + with self._lock: + self._con.execute( + "INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)", + (filename, profile), + ) + self._con.commit() def unhide_file(self, filename: str, profile: str = "default") -> None: if not self._enabled: return - self._con.execute( - "DELETE FROM hidden_files WHERE filename = ? AND profile = ?", - (filename, profile), - ) - self._con.commit() + with self._lock: + self._con.execute( + "DELETE FROM hidden_files WHERE filename = ? AND profile = ?", + (filename, profile), + ) + self._con.commit() def get_hidden_files(self, profile: str = "default") -> set[str]: if not self._enabled: diff --git a/core/export.py b/core/export.py index 4f04070..9c3eb89 100644 --- a/core/export.py +++ b/core/export.py @@ -91,7 +91,10 @@ class ExportRunner: raise RuntimeError(msg) if self._image_sequence: audio_cmd = build_audio_extract_command(self._input, start, output) - subprocess.run(audio_cmd, capture_output=True, text=True, timeout=60) + audio_result = subprocess.run(audio_cmd, capture_output=True, text=True, timeout=60) + if audio_result.returncode != 0: + msg = (audio_result.stderr or "audio extraction failed")[-500:] + raise RuntimeError(msg) return output def _run(self): diff --git a/docs/plans/2026-04-16-client-implementation.md b/docs/plans/2026-04-16-client-implementation.md index 43ac247..8fff7b5 100644 --- a/docs/plans/2026-04-16-client-implementation.md +++ b/docs/plans/2026-04-16-client-implementation.md @@ -152,16 +152,21 @@ export function getFiles(root?: string): Promise { return get(`/api/files${q}`); } +// For {path:path} routes, encode each segment individually to preserve slashes +function encodePath(p: string): string { + return p.split("/").map(encodeURIComponent).join("/"); +} + export function streamUrl(path: string, root: string, quality: string): string { - return `${serverUrl}/api/stream/${encodeURIComponent(path)}?root=${encodeURIComponent(root)}&quality=${quality}`; + return `${serverUrl}/api/stream/${encodePath(path)}?root=${encodeURIComponent(root)}&quality=${quality}`; } export function audioUrl(path: string, root: string): string { - return `${serverUrl}/api/audio/${encodeURIComponent(path)}?root=${encodeURIComponent(root)}`; + return `${serverUrl}/api/audio/${encodePath(path)}?root=${encodeURIComponent(root)}`; } export function cacheStatus(path: string, root: string): Promise> { - return get(`/api/cache/status/${encodeURIComponent(path)}?root=${encodeURIComponent(root)}`); + return get(`/api/cache/status/${encodePath(path)}?root=${encodeURIComponent(root)}`); } // --- Markers & Profiles --- @@ -311,12 +316,10 @@ export const clipSpan = derived( ); export const visibleFiles = derived( - [files, hiddenFiles, hideExported, showHidden, markers], - ([$files, $hidden, $hideExported, $showHidden, $markers]) => { - const exportedNames = new Set($markers.map(m => m.output_path)); + [files, hiddenFiles, showHidden], + ([$files, $hidden, $showHidden]) => { return $files.filter(f => { if (!$showHidden && $hidden.has(f.name)) return false; - // hideExported filtering would need per-file marker lookup return true; }); } @@ -398,19 +401,21 @@ git commit -m "feat: add WebSocket client for export progress" **Step 1: Create mpv.rs** -This module spawns mpv with `--input-ipc-server`, then sends JSON IPC commands over the Unix socket. +This module spawns mpv with `--input-ipc-server`, then sends JSON IPC commands over the Unix socket. Uses a persistent BufReader and request_id to correctly handle mpv's interleaved events and responses. ```rust use std::io::{BufRead, BufReader, Write}; use std::os::unix::net::UnixStream; use std::process::{Child, Command}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; use serde_json::{json, Value}; pub struct Mpv { process: Option, - socket: Option, + writer: Option, + reader: Option>, socket_path: String, + next_id: AtomicU64, } impl Mpv { @@ -418,13 +423,14 @@ impl Mpv { let socket_path = format!("/tmp/8cut-mpv-{}", std::process::id()); Mpv { process: None, - socket: None, + writer: None, + reader: None, socket_path, + next_id: AtomicU64::new(1), } } pub fn start(&mut self) -> Result<(), String> { - // Kill existing self.stop(); let child = Command::new("mpv") @@ -445,7 +451,9 @@ impl Mpv { std::thread::sleep(std::time::Duration::from_millis(100)); if let Ok(stream) = UnixStream::connect(&self.socket_path) { stream.set_nonblocking(false).ok(); - self.socket = Some(stream); + let reader_stream = stream.try_clone().map_err(|e| e.to_string())?; + self.writer = Some(stream); + self.reader = Some(BufReader::new(reader_stream)); return Ok(()); } } @@ -458,55 +466,62 @@ impl Mpv { child.wait().ok(); } self.process = None; - self.socket = None; + self.writer = None; + self.reader = None; std::fs::remove_file(&self.socket_path).ok(); } - pub fn command(&mut self, args: &[&str]) -> Result<(), String> { - let socket = self.socket.as_mut().ok_or("mpv not running")?; - let cmd = json!({ "command": args }); - let mut msg = serde_json::to_string(&cmd).unwrap(); - msg.push('\n'); - socket.write_all(msg.as_bytes()).map_err(|e| e.to_string())?; + /// Send a command and wait for the matching response (by request_id). + /// Skips over asynchronous mpv events while waiting. + fn send_and_recv(&mut self, cmd: Value) -> Result { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let writer = self.writer.as_mut().ok_or("mpv not running")?; + let reader = self.reader.as_mut().ok_or("mpv not running")?; - // Read response - let mut reader = BufReader::new(socket.try_clone().map_err(|e| e.to_string())?); + let mut msg_val = cmd; + msg_val["request_id"] = json!(id); + let mut msg = serde_json::to_string(&msg_val).unwrap(); + msg.push('\n'); + writer.write_all(msg.as_bytes()).map_err(|e| e.to_string())?; + + // Read lines until we find the response matching our request_id let mut line = String::new(); - reader.read_line(&mut line).map_err(|e| e.to_string())?; + loop { + line.clear(); + reader.read_line(&mut line).map_err(|e| e.to_string())?; + let parsed: Value = serde_json::from_str(&line).map_err(|e| e.to_string())?; + // mpv events have "event" key, responses have "request_id" + if parsed.get("request_id").and_then(|v| v.as_u64()) == Some(id) { + return Ok(parsed); + } + // Otherwise it's an async event — skip it + } + } + + pub fn command(&mut self, args: &[&str]) -> Result<(), String> { + let resp = self.send_and_recv(json!({ "command": args }))?; + if resp.get("error").and_then(|e| e.as_str()) != Some("success") { + return Err(format!("mpv error: {}", resp.get("error").unwrap_or(&Value::Null))); + } Ok(()) } pub fn set_property(&mut self, name: &str, value: Value) -> Result<(), String> { - let socket = self.socket.as_mut().ok_or("mpv not running")?; - let cmd = json!({ "command": ["set_property", name, value] }); - let mut msg = serde_json::to_string(&cmd).unwrap(); - msg.push('\n'); - socket.write_all(msg.as_bytes()).map_err(|e| e.to_string())?; - - let mut reader = BufReader::new(socket.try_clone().map_err(|e| e.to_string())?); - let mut line = String::new(); - reader.read_line(&mut line).map_err(|e| e.to_string())?; - Ok(()) + self.command(&["set_property", name, &value.to_string()]) } pub fn get_property(&mut self, name: &str) -> Result { - let socket = self.socket.as_mut().ok_or("mpv not running")?; - let cmd = json!({ "command": ["get_property", name] }); - let mut msg = serde_json::to_string(&cmd).unwrap(); - msg.push('\n'); - socket.write_all(msg.as_bytes()).map_err(|e| e.to_string())?; - - let mut reader = BufReader::new(socket.try_clone().map_err(|e| e.to_string())?); - let mut line = String::new(); - reader.read_line(&mut line).map_err(|e| e.to_string())?; - let resp: Value = serde_json::from_str(&line).map_err(|e| e.to_string())?; + let resp = self.send_and_recv(json!({ "command": ["get_property", name] }))?; + if resp.get("error").and_then(|e| e.as_str()) != Some("success") { + return Err(format!("mpv error: {}", resp.get("error").unwrap_or(&Value::Null))); + } Ok(resp.get("data").cloned().unwrap_or(Value::Null)) } pub fn load_file(&mut self, video_url: &str, audio_url: &str) -> Result<(), String> { - self.command(&["loadfile", video_url])?; - self.set_property("audio-files", json!(audio_url))?; - Ok(()) + // Pass audio-file option during load so both streams sync from the start + let options = format!("audio-file={}", audio_url); + self.command(&["loadfile", video_url, "replace", &options]) } pub fn seek(&mut self, time: f64) -> Result<(), String> { @@ -651,7 +666,9 @@ pub fn run() { #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] fn main() { - client_lib::run(); + // Crate name matches the `name` field in Cargo.toml (with hyphens → underscores). + // The Tauri scaffold sets this — adjust if the package is named differently. + app_lib::run(); } ``` @@ -662,6 +679,8 @@ Add `serde_json` to `client/src-tauri/Cargo.toml` dependencies: serde_json = "1" serde = { version = "1", features = ["derive"] } tauri = { version = "2", features = [] } + +[build-dependencies] tauri-build = { version = "2", features = [] } ``` @@ -1071,14 +1090,14 @@ git commit -m "feat: add canvas-based timeline component" const CATEGORIES = ["", "Human", "Animal", "Vehicle", "Tool", "Music", "Nature", "Sport", "Other"]; const RATIOS = ["Off", "9:16", "4:5", "1:1"]; - async function doExport(folderSuffix: string = "") { + export async function doExport(folderSuffix: string = "") { if (!$currentFile) return; $exportStatus = "running"; $exportCompleted = 0; $exportTotal = $clips; const req = { - input_path: `${$currentFile.root}${$currentFile.path}`, + input_path: `${$currentFile.root}/${$currentFile.path}`, cursor: $cursor, name: $clipName || $currentFile.name.replace(/\.[^.]+$/, ""), clips: $clips, @@ -1302,8 +1321,9 @@ git commit -m "feat: add profile bar component" clearInterval(pollInterval); }); - // Load file into mpv when currentFile changes + // Load file into mpv when currentFile OR quality changes $: if ($currentFile) { + void $quality; // trigger reactivity on quality change too const vUrl = streamUrl($currentFile.path, $currentFile.root, $quality); const aUrl = audioUrl($currentFile.path, $currentFile.root); mpvLoad(vUrl, aUrl).then(async () => { @@ -1370,6 +1390,7 @@ git commit -m "feat: add profile bar component" @@ -1392,7 +1413,7 @@ git commit -m "feat: add profile bar component" - + @@ -1476,6 +1497,9 @@ git commit -m "feat: wire up main app layout with all components" Add to the `