fix: server bug fixes from review

- DB: add threading.Lock on all write methods and multi-step reads
- export.py: check audio extraction return code, raise on failure
- routes/export: counter race condition fix with _counter_lock
- routes/export: delete validation accepts EXPORT_DIR_suffix siblings
- routes/export: evict old finished jobs to prevent unbounded growth
- client plan: fix 10 bugs (mpv IPC, encodePath, input_path sep, etc.)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 19:53:38 +02:00
parent 409eb82e5c
commit 39f873bec2
4 changed files with 166 additions and 108 deletions
+48 -41
View File
@@ -1,4 +1,5 @@
import sqlite3
import threading
from datetime import datetime, timezone
from pathlib import Path
@@ -12,6 +13,7 @@ class ProcessedDB:
if db_path is None:
db_path = str(Path.home() / ".8cut.db")
self._path = db_path
self._lock = threading.Lock()
try:
self._con = sqlite3.connect(db_path, check_same_thread=False)
self._migrate()
@@ -86,18 +88,19 @@ class ProcessedDB:
profile: str = "default") -> None:
if not self._enabled:
return
self._con.execute(
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, spread, profile, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, start_time, output_path, label, category,
short_side, portrait_ratio, crop_center, fmt,
clip_count, spread, profile,
datetime.now(timezone.utc).isoformat()),
)
self._con.commit()
with self._lock:
self._con.execute(
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, spread, profile, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, start_time, output_path, label, category,
short_side, portrait_ratio, crop_center, fmt,
clip_count, spread, profile,
datetime.now(timezone.utc).isoformat()),
)
self._con.commit()
def get_labels(self) -> list[str]:
"""Return distinct non-empty labels ordered by most recently used."""
@@ -134,8 +137,9 @@ class ProcessedDB:
def delete_by_output_path(self, output_path: str) -> None:
if not self._enabled:
return
self._con.execute("DELETE FROM processed WHERE output_path = ?", (output_path,))
self._con.commit()
with self._lock:
self._con.execute("DELETE FROM processed WHERE output_path = ?", (output_path,))
self._con.commit()
def get_group(self, output_path: str) -> list[str]:
"""Return all output_paths sharing the same (filename, start_time) as *output_path*."""
@@ -159,23 +163,24 @@ class ProcessedDB:
Returns list of deleted output_paths."""
if not self._enabled:
return []
row = self._con.execute(
"SELECT filename, start_time FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
if not row:
return []
filename, start_time = row
paths = [r[0] for r in self._con.execute(
"SELECT output_path FROM processed WHERE filename = ? AND start_time = ?",
(filename, start_time),
).fetchall()]
self._con.execute(
"DELETE FROM processed WHERE filename = ? AND start_time = ?",
(filename, start_time),
)
self._con.commit()
return paths
with self._lock:
row = self._con.execute(
"SELECT filename, start_time FROM processed WHERE output_path = ?",
(output_path,),
).fetchone()
if not row:
return []
filename, start_time = row
paths = [r[0] for r in self._con.execute(
"SELECT output_path FROM processed WHERE filename = ? AND start_time = ?",
(filename, start_time),
).fetchall()]
self._con.execute(
"DELETE FROM processed WHERE filename = ? AND start_time = ?",
(filename, start_time),
)
self._con.commit()
return paths
def _get_markers_for(self, match: str, profile: str = "default") -> list[tuple[float, int, str]]:
rows = self._con.execute(
@@ -211,20 +216,22 @@ class ProcessedDB:
def hide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return
self._con.execute(
"INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)",
(filename, profile),
)
self._con.commit()
with self._lock:
self._con.execute(
"INSERT OR IGNORE INTO hidden_files (filename, profile) VALUES (?, ?)",
(filename, profile),
)
self._con.commit()
def unhide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return
self._con.execute(
"DELETE FROM hidden_files WHERE filename = ? AND profile = ?",
(filename, profile),
)
self._con.commit()
with self._lock:
self._con.execute(
"DELETE FROM hidden_files WHERE filename = ? AND profile = ?",
(filename, profile),
)
self._con.commit()
def get_hidden_files(self, profile: str = "default") -> set[str]:
if not self._enabled:
+4 -1
View File
@@ -91,7 +91,10 @@ class ExportRunner:
raise RuntimeError(msg)
if self._image_sequence:
audio_cmd = build_audio_extract_command(self._input, start, output)
subprocess.run(audio_cmd, capture_output=True, text=True, timeout=60)
audio_result = subprocess.run(audio_cmd, capture_output=True, text=True, timeout=60)
if audio_result.returncode != 0:
msg = (audio_result.stderr or "audio extraction failed")[-500:]
raise RuntimeError(msg)
return output
def _run(self):
+77 -53
View File
@@ -152,16 +152,21 @@ export function getFiles(root?: string): Promise<VideoFile[]> {
return get(`/api/files${q}`);
}
// For {path:path} routes, encode each segment individually to preserve slashes
function encodePath(p: string): string {
return p.split("/").map(encodeURIComponent).join("/");
}
export function streamUrl(path: string, root: string, quality: string): string {
return `${serverUrl}/api/stream/${encodeURIComponent(path)}?root=${encodeURIComponent(root)}&quality=${quality}`;
return `${serverUrl}/api/stream/${encodePath(path)}?root=${encodeURIComponent(root)}&quality=${quality}`;
}
export function audioUrl(path: string, root: string): string {
return `${serverUrl}/api/audio/${encodeURIComponent(path)}?root=${encodeURIComponent(root)}`;
return `${serverUrl}/api/audio/${encodePath(path)}?root=${encodeURIComponent(root)}`;
}
export function cacheStatus(path: string, root: string): Promise<Record<string, string>> {
return get(`/api/cache/status/${encodeURIComponent(path)}?root=${encodeURIComponent(root)}`);
return get(`/api/cache/status/${encodePath(path)}?root=${encodeURIComponent(root)}`);
}
// --- Markers & Profiles ---
@@ -311,12 +316,10 @@ export const clipSpan = derived(
);
export const visibleFiles = derived(
[files, hiddenFiles, hideExported, showHidden, markers],
([$files, $hidden, $hideExported, $showHidden, $markers]) => {
const exportedNames = new Set($markers.map(m => m.output_path));
[files, hiddenFiles, showHidden],
([$files, $hidden, $showHidden]) => {
return $files.filter(f => {
if (!$showHidden && $hidden.has(f.name)) return false;
// hideExported filtering would need per-file marker lookup
return true;
});
}
@@ -398,19 +401,21 @@ git commit -m "feat: add WebSocket client for export progress"
**Step 1: Create mpv.rs**
This module spawns mpv with `--input-ipc-server`, then sends JSON IPC commands over the Unix socket.
This module spawns mpv with `--input-ipc-server`, then sends JSON IPC commands over the Unix socket. Uses a persistent BufReader and request_id to correctly handle mpv's interleaved events and responses.
```rust
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::process::{Child, Command};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::{json, Value};
pub struct Mpv {
process: Option<Child>,
socket: Option<UnixStream>,
writer: Option<UnixStream>,
reader: Option<BufReader<UnixStream>>,
socket_path: String,
next_id: AtomicU64,
}
impl Mpv {
@@ -418,13 +423,14 @@ impl Mpv {
let socket_path = format!("/tmp/8cut-mpv-{}", std::process::id());
Mpv {
process: None,
socket: None,
writer: None,
reader: None,
socket_path,
next_id: AtomicU64::new(1),
}
}
pub fn start(&mut self) -> Result<(), String> {
// Kill existing
self.stop();
let child = Command::new("mpv")
@@ -445,7 +451,9 @@ impl Mpv {
std::thread::sleep(std::time::Duration::from_millis(100));
if let Ok(stream) = UnixStream::connect(&self.socket_path) {
stream.set_nonblocking(false).ok();
self.socket = Some(stream);
let reader_stream = stream.try_clone().map_err(|e| e.to_string())?;
self.writer = Some(stream);
self.reader = Some(BufReader::new(reader_stream));
return Ok(());
}
}
@@ -458,55 +466,62 @@ impl Mpv {
child.wait().ok();
}
self.process = None;
self.socket = None;
self.writer = None;
self.reader = None;
std::fs::remove_file(&self.socket_path).ok();
}
pub fn command(&mut self, args: &[&str]) -> Result<(), String> {
let socket = self.socket.as_mut().ok_or("mpv not running")?;
let cmd = json!({ "command": args });
let mut msg = serde_json::to_string(&cmd).unwrap();
msg.push('\n');
socket.write_all(msg.as_bytes()).map_err(|e| e.to_string())?;
/// Send a command and wait for the matching response (by request_id).
/// Skips over asynchronous mpv events while waiting.
fn send_and_recv(&mut self, cmd: Value) -> Result<Value, String> {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let writer = self.writer.as_mut().ok_or("mpv not running")?;
let reader = self.reader.as_mut().ok_or("mpv not running")?;
// Read response
let mut reader = BufReader::new(socket.try_clone().map_err(|e| e.to_string())?);
let mut msg_val = cmd;
msg_val["request_id"] = json!(id);
let mut msg = serde_json::to_string(&msg_val).unwrap();
msg.push('\n');
writer.write_all(msg.as_bytes()).map_err(|e| e.to_string())?;
// Read lines until we find the response matching our request_id
let mut line = String::new();
reader.read_line(&mut line).map_err(|e| e.to_string())?;
loop {
line.clear();
reader.read_line(&mut line).map_err(|e| e.to_string())?;
let parsed: Value = serde_json::from_str(&line).map_err(|e| e.to_string())?;
// mpv events have "event" key, responses have "request_id"
if parsed.get("request_id").and_then(|v| v.as_u64()) == Some(id) {
return Ok(parsed);
}
// Otherwise it's an async event — skip it
}
}
pub fn command(&mut self, args: &[&str]) -> Result<(), String> {
let resp = self.send_and_recv(json!({ "command": args }))?;
if resp.get("error").and_then(|e| e.as_str()) != Some("success") {
return Err(format!("mpv error: {}", resp.get("error").unwrap_or(&Value::Null)));
}
Ok(())
}
pub fn set_property(&mut self, name: &str, value: Value) -> Result<(), String> {
let socket = self.socket.as_mut().ok_or("mpv not running")?;
let cmd = json!({ "command": ["set_property", name, value] });
let mut msg = serde_json::to_string(&cmd).unwrap();
msg.push('\n');
socket.write_all(msg.as_bytes()).map_err(|e| e.to_string())?;
let mut reader = BufReader::new(socket.try_clone().map_err(|e| e.to_string())?);
let mut line = String::new();
reader.read_line(&mut line).map_err(|e| e.to_string())?;
Ok(())
self.command(&["set_property", name, &value.to_string()])
}
pub fn get_property(&mut self, name: &str) -> Result<Value, String> {
let socket = self.socket.as_mut().ok_or("mpv not running")?;
let cmd = json!({ "command": ["get_property", name] });
let mut msg = serde_json::to_string(&cmd).unwrap();
msg.push('\n');
socket.write_all(msg.as_bytes()).map_err(|e| e.to_string())?;
let mut reader = BufReader::new(socket.try_clone().map_err(|e| e.to_string())?);
let mut line = String::new();
reader.read_line(&mut line).map_err(|e| e.to_string())?;
let resp: Value = serde_json::from_str(&line).map_err(|e| e.to_string())?;
let resp = self.send_and_recv(json!({ "command": ["get_property", name] }))?;
if resp.get("error").and_then(|e| e.as_str()) != Some("success") {
return Err(format!("mpv error: {}", resp.get("error").unwrap_or(&Value::Null)));
}
Ok(resp.get("data").cloned().unwrap_or(Value::Null))
}
pub fn load_file(&mut self, video_url: &str, audio_url: &str) -> Result<(), String> {
self.command(&["loadfile", video_url])?;
self.set_property("audio-files", json!(audio_url))?;
Ok(())
// Pass audio-file option during load so both streams sync from the start
let options = format!("audio-file={}", audio_url);
self.command(&["loadfile", video_url, "replace", &options])
}
pub fn seek(&mut self, time: f64) -> Result<(), String> {
@@ -651,7 +666,9 @@ pub fn run() {
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
fn main() {
client_lib::run();
// Crate name matches the `name` field in Cargo.toml (with hyphens → underscores).
// The Tauri scaffold sets this — adjust if the package is named differently.
app_lib::run();
}
```
@@ -662,6 +679,8 @@ Add `serde_json` to `client/src-tauri/Cargo.toml` dependencies:
serde_json = "1"
serde = { version = "1", features = ["derive"] }
tauri = { version = "2", features = [] }
[build-dependencies]
tauri-build = { version = "2", features = [] }
```
@@ -1071,14 +1090,14 @@ git commit -m "feat: add canvas-based timeline component"
const CATEGORIES = ["", "Human", "Animal", "Vehicle", "Tool", "Music", "Nature", "Sport", "Other"];
const RATIOS = ["Off", "9:16", "4:5", "1:1"];
async function doExport(folderSuffix: string = "") {
export async function doExport(folderSuffix: string = "") {
if (!$currentFile) return;
$exportStatus = "running";
$exportCompleted = 0;
$exportTotal = $clips;
const req = {
input_path: `${$currentFile.root}${$currentFile.path}`,
input_path: `${$currentFile.root}/${$currentFile.path}`,
cursor: $cursor,
name: $clipName || $currentFile.name.replace(/\.[^.]+$/, ""),
clips: $clips,
@@ -1302,8 +1321,9 @@ git commit -m "feat: add profile bar component"
clearInterval(pollInterval);
});
// Load file into mpv when currentFile changes
// Load file into mpv when currentFile OR quality changes
$: if ($currentFile) {
void $quality; // trigger reactivity on quality change too
const vUrl = streamUrl($currentFile.path, $currentFile.root, $quality);
const aUrl = audioUrl($currentFile.path, $currentFile.root);
mpvLoad(vUrl, aUrl).then(async () => {
@@ -1370,6 +1390,7 @@ git commit -m "feat: add profile bar component"
</div>
<Timeline
onCursorChange={handleCursorChange}
onSeek={handleCursorChange}
onMarkerClick={handleMarkerClick}
onMarkerDelete={handleMarkerDelete}
/>
@@ -1392,7 +1413,7 @@ git commit -m "feat: add profile bar component"
<option value="high">Original</option>
</select>
</div>
<ExportPanel />
<ExportPanel bind:this={exportPanelRef} />
</div>
</div>
</main>
@@ -1476,6 +1497,9 @@ git commit -m "feat: wire up main app layout with all components"
Add to the `<script>` in App.svelte:
```typescript
// Export trigger — called from keyboard shortcuts and forwarded to ExportPanel
let exportPanelRef: ExportPanel;
function handleKeydown(e: KeyboardEvent) {
// Ignore when typing in inputs
const tag = (e.target as HTMLElement).tagName;
@@ -1488,7 +1512,7 @@ function handleKeydown(e: KeyboardEvent) {
break;
case "e":
case "E":
doMainExport();
exportPanelRef?.doExport();
break;
case "ArrowLeft":
$cursor = Math.max(0, $cursor - 1);
@@ -1505,7 +1529,7 @@ function handleKeydown(e: KeyboardEvent) {
if (num >= 1 && num <= 9) {
const idx = num - 1;
if (idx < $subprofiles.length) {
doSubprofileExport($subprofiles[idx]);
exportPanelRef?.doExport($subprofiles[idx]);
}
}
}
+37 -13
View File
@@ -1,6 +1,8 @@
import os
import re
import shutil
import threading
import time
import uuid
from fastapi import APIRouter, HTTPException, Query
@@ -15,9 +17,12 @@ from ..config import EXPORT_DIR, MEDIA_DIRS
router = APIRouter()
_jobs: dict[str, dict] = {}
_counter_lock = threading.Lock()
_VALID_ENCODERS = {"libx264", "h264_nvenc", "h264_vaapi", "h264_qsv", "h264_amf", "h264_videotoolbox"}
_MAX_FINISHED_JOBS = 200
class CropKeyframe(BaseModel):
time: float
@@ -94,18 +99,19 @@ def start_export(req: ExportRequest):
folder = folder.rstrip(os.sep) + "_" + req.folder_suffix
image_sequence = req.format in ("WebP", "WebP sequence")
counter = _next_counter(folder, req.name)
# Build job list: (start, output_path, portrait_ratio, crop_center)
jobs = []
for i in range(req.clips):
start = req.cursor + i * req.spread
if image_sequence:
out = build_sequence_dir(folder, req.name, counter, sub=i if req.clips > 1 else None)
else:
out = build_export_path(folder, req.name, counter, sub=i if req.clips > 1 else None)
os.makedirs(os.path.dirname(out), exist_ok=True)
jobs.append((start, out, req.portrait_ratio, req.crop_center))
# Lock counter + directory creation to prevent race between concurrent exports
with _counter_lock:
counter = _next_counter(folder, req.name)
jobs = []
for i in range(req.clips):
start = req.cursor + i * req.spread
if image_sequence:
out = build_sequence_dir(folder, req.name, counter, sub=i if req.clips > 1 else None)
else:
out = build_export_path(folder, req.name, counter, sub=i if req.clips > 1 else None)
os.makedirs(os.path.dirname(out), exist_ok=True)
jobs.append((start, out, req.portrait_ratio, req.crop_center))
# Apply keyframes if provided — returns 6-tuples, strip back to 4
if req.crop_keyframes:
@@ -163,11 +169,18 @@ def start_export(req: ExportRequest):
on_error=on_error,
)
# Evict old finished jobs to prevent unbounded growth
finished = [k for k, v in _jobs.items() if v["status"] in ("done", "error")]
if len(finished) > _MAX_FINISHED_JOBS:
for k in finished[:len(finished) - _MAX_FINISHED_JOBS]:
del _jobs[k]
_jobs[job_id] = {
"status": "running",
"total": len(jobs),
"completed": completed,
"runner": runner,
"created_at": time.monotonic(),
}
runner.start()
@@ -188,12 +201,23 @@ def get_export_status(job_id: str):
}
def _is_under_export_dir(real_path: str) -> bool:
"""Check if path is under EXPORT_DIR or any EXPORT_DIR_suffix sibling."""
export_real = os.path.realpath(EXPORT_DIR).rstrip(os.sep)
# Walk up ancestors — must find EXPORT_DIR or EXPORT_DIR_suffix
d = os.path.dirname(real_path)
while d != os.path.dirname(d):
if d == export_real or d.startswith(export_real + "_"):
return True
d = os.path.dirname(d)
return False
@router.delete("/export")
def delete_export(output_path: str = Query(...)):
from ..app import db
# Validate path is under EXPORT_DIR
real = os.path.realpath(output_path)
if not real.startswith(os.path.realpath(EXPORT_DIR) + os.sep):
if not _is_under_export_dir(real):
raise HTTPException(status_code=403, detail="path outside export directory")
db.delete_by_output_path(real)
if os.path.isfile(real):