Files
8-cut/core/ffmpeg.py
T
Ethanfel 4445f0e7f4 fix: audio extract honored a silent length clamp — 30s near the end became 3s
_on_extract_audio clamped the duration to (timeline_duration - cursor), so with
the playhead within the requested length of the end (or any under-reported
duration) a 30s request was silently truncated to whatever remained — the user
asked for 30s and got 3s with no indication why.

Drop the clamp: pass the requested length straight to ffmpeg, which stops
cleanly at end-of-file if the source is shorter. Then ffprobe the result and,
when it comes up short, say so ("Saved 3.0s — source ended before 30.0s
requested") instead of silently shrinking. When there's room, 30s now yields
exactly 30s.

Adds core.ffmpeg.probe_duration(). Verified end-to-end: a fitting request
returns the exact length; a genuine near-end request returns the available
audio (rc=0) and is reported as truncated.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-02 00:07:35 +02:00

251 lines
8.1 KiB
Python

import os
import re
import subprocess
import sys
from .paths import _bin, _log
_RATIOS: dict[str, tuple[int, int]] = {
"9:16": (9, 16),
"4:5": (4, 5),
"1:1": (1, 1),
}
def _portrait_crop_filter(ratio: str, crop_center: float) -> str:
"""Return an ffmpeg crop= filter expression for the given portrait ratio.
Uses ffmpeg expression syntax so source dimensions are resolved at runtime.
Commas inside min()/max() are escaped with \\, to prevent ffmpeg's
filtergraph parser from treating them as filter-chain separators.
"""
num, den = _RATIOS[ratio]
cw = f"ih*{num}/{den}"
x = f"max(0\\,min((iw-{cw})*{crop_center}\\,iw-{cw}))"
return f"crop={cw}:ih:{x}:0"
def resolve_keyframe(
keyframes: list[tuple[float, float, str | None, bool, bool]],
t: float,
tolerance: float = 0.05,
) -> tuple[float, float, str | None, bool, bool] | None:
"""Return the latest keyframe at or before *t*, or None."""
result = None
for kf in keyframes:
if kf[0] <= t + tolerance:
result = kf
else:
break
return result
def apply_keyframes_to_jobs(
jobs: list[tuple[float, str, str | None, float]],
keyframes: list[tuple[float, float, str | None, bool, bool]],
base_center: float,
base_ratio: str | None,
base_rand_p: bool,
base_rand_s: bool,
) -> list[tuple[float, str, str | None, float, bool, bool]]:
"""Resolve each job's crop state from keyframes, returning widened tuples.
Returns list of (start, path, ratio, center, rand_portrait, rand_square).
"""
result = []
for s, o, _r, _c in jobs:
kf = resolve_keyframe(keyframes, s)
if kf is not None:
_, center, ratio, rp, rs = kf
else:
center, ratio, rp, rs = base_center, base_ratio, base_rand_p, base_rand_s
result.append((s, o, ratio, center, rp, rs))
return result
def _find_vaapi_device() -> str:
"""Return the first available VAAPI render device path (Linux)."""
import glob
devices = sorted(glob.glob("/dev/dri/renderD*"))
return devices[0] if devices else "/dev/dri/renderD128"
def build_ffmpeg_command(
input_path: str, start: float, output_path: str,
short_side: int | None = None,
portrait_ratio: str | None = None,
crop_center: float = 0.5,
image_sequence: bool = False,
encoder: str = "libx264",
duration: float = 8.0,
target_fps: float | None = None,
snap32: bool = False,
frames: int | None = None,
) -> list[str]:
# -ss before -i: fast input-seeking. Safe here because we always re-encode,
# so there is no keyframe-alignment issue from pre-input seek.
# Image sequences always use libwebp, so skip HW encoder setup.
use_hw_vaapi = (encoder == "h264_vaapi" and not image_sequence
and sys.platform == "linux")
cmd = [_bin("ffmpeg"), "-y"]
# VAAPI needs a render device for hardware context (Linux only).
if use_hw_vaapi:
vaapi_dev = _find_vaapi_device()
cmd += ["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi",
"-vaapi_device", vaapi_dev]
cmd += [
"-threads", "0",
"-ss", str(start),
"-i", input_path,
"-t", str(duration),
]
filters: list[str] = []
if portrait_ratio is not None:
filters.append(_portrait_crop_filter(portrait_ratio, crop_center))
if short_side is not None:
# Scale so the shorter dimension equals short_side.
filters.append(
f"scale='if(lt(iw,ih),{short_side},-2)':'if(lt(iw,ih),-2,{short_side})':flags=lanczos"
)
# LTX-2: centered crop to ÷32 (no rescale → no aspect distortion) then fps.
# Placed among CPU filters, after scale and before the VAAPI hwupload block.
if snap32:
filters.append("crop=trunc(iw/32)*32:trunc(ih/32)*32")
if target_fps is not None:
filters.append(f"fps={target_fps:g}")
# VAAPI: decoded frames are GPU surfaces. CPU filters need hwdownload first.
if use_hw_vaapi:
if filters:
filters.insert(0, "hwdownload")
filters.insert(1, "format=nv12")
filters.append("format=nv12")
filters.append("hwupload")
if filters:
cmd += ["-vf", ",".join(filters)]
# LTX-2 output rate + exact frame cap (apply to both clip and webp-seq paths).
if target_fps is not None:
cmd += ["-r", f"{target_fps:g}"]
if frames is not None:
cmd += ["-frames:v", str(frames)]
if image_sequence:
cmd += [
"-an",
"-c:v", "libwebp",
"-quality", "92",
"-compression_level", "1",
os.path.join(output_path, "frame_%04d.webp"),
]
else:
cmd += ["-c:v", encoder]
if "nvenc" in encoder:
cmd += ["-preset", "p4", "-cq", "28"]
elif "vaapi" in encoder:
cmd += ["-qp", "28"]
elif "qsv" in encoder:
cmd += ["-global_quality", "28"]
elif "amf" in encoder:
cmd += ["-qp_i", "28", "-qp_p", "28"]
cmd += ["-c:a", "pcm_s16le", output_path]
return cmd
def build_audio_extract_command(input_path: str, start: float, sequence_dir: str,
duration: float = 8.0) -> list[str]:
"""Return an ffmpeg command that extracts audio to <sequence_dir>.wav."""
audio_path = sequence_dir + ".wav"
return [
_bin("ffmpeg"), "-y",
"-ss", str(start),
"-i", input_path,
"-t", str(duration),
"-vn",
"-c:a", "pcm_s16le",
audio_path,
]
# Audio codec chosen per output extension for the manual "Extract audio area"
# tool. Empty list -> let ffmpeg pick a default encoder from the extension.
_AUDIO_CODEC_BY_EXT: dict[str, list[str]] = {
".wav": ["-c:a", "pcm_s16le"],
".flac": ["-c:a", "flac"],
".mp3": ["-c:a", "libmp3lame", "-q:a", "2"],
".m4a": ["-c:a", "aac", "-b:a", "256k"],
".aac": ["-c:a", "aac", "-b:a", "256k"],
".ogg": ["-c:a", "libvorbis", "-q:a", "5"],
".opus": ["-c:a", "libopus", "-b:a", "192k"],
}
def probe_duration(path: str) -> float | None:
"""Return the media duration in seconds via ffprobe, or None on failure."""
try:
r = subprocess.run(
[_bin("ffprobe"), "-v", "error", "-show_entries", "format=duration",
"-of", "default=nw=1:nk=1", path],
capture_output=True, text=True, timeout=30,
)
if r.returncode == 0 and r.stdout.strip():
return float(r.stdout.strip())
except Exception:
pass
return None
def build_audio_clip_command(input_path: str, start: float, duration: float,
out_path: str) -> list[str]:
"""ffmpeg command to extract exactly *duration* seconds of audio starting
at *start*, re-encoded per *out_path*'s extension (wav/mp3/flac/…)."""
ext = os.path.splitext(out_path)[1].lower()
codec = _AUDIO_CODEC_BY_EXT.get(ext, [])
return [
_bin("ffmpeg"), "-y",
"-ss", str(start),
"-i", input_path,
"-t", str(duration),
"-vn",
*codec,
out_path,
]
def detect_hw_encoders() -> list[str]:
"""Probe ffmpeg for available H.264 hardware encoders.
Returns only encoders relevant to the current platform:
- Windows: h264_nvenc, h264_qsv, h264_amf
- Linux: h264_nvenc, h264_vaapi, h264_qsv
- macOS: h264_videotoolbox
"""
if sys.platform == "win32":
candidates = ["h264_nvenc", "h264_qsv", "h264_amf"]
elif sys.platform == "darwin":
candidates = ["h264_videotoolbox"]
else:
candidates = ["h264_nvenc", "h264_vaapi", "h264_qsv"]
try:
result = subprocess.run(
[_bin("ffmpeg"), "-hide_banner", "-encoders"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0:
return []
output = result.stdout
except Exception:
return []
available = [enc for enc in candidates if re.search(rf'\b{enc}\b', output)]
if available:
_log(f"HW encoders detected: {', '.join(available)}")
else:
_log("No HW encoders detected — GPU export unavailable")
return available