Files
8-cut/core/ffmpeg.py
T
Ethanfel ed63d04abf feat: Extract audio area — exact-length audio slice from the playhead, save-as
A dedicated "♪ Extract audio" button on the transport row grabs an exact
length of audio (set via the adjacent length box, from the playhead) and opens
a Save As dialog. Output format follows the chosen extension — WAV (pcm_s16le),
MP3 (libmp3lame), FLAC, m4a/aac, ogg/opus — re-encoding as needed; unknown
extensions let ffmpeg pick from the container.

- core.ffmpeg.build_audio_clip_command(input, start, duration, out_path):
  fast-seek + exact -t duration + -vn, codec by extension. Verified end-to-end
  (wav/mp3/flac all land at exactly the requested duration).
- Timeline shows the audio area as a distinct teal dashed band spanning
  [cursor, cursor+length], updated live as the playhead or length changes, so
  you see exactly what will be extracted.
- Length + last save dir persist in QSettings; button enabled once a file loads.

Tests: 3 core (codec-by-extension, exact length, case-insensitive) + 2 GUI
(controls exist, band tracks cursor/length).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-01 23:48:24 +02:00

236 lines
7.6 KiB
Python

import os
import re
import subprocess
import sys
from .paths import _bin, _log
_RATIOS: dict[str, tuple[int, int]] = {
"9:16": (9, 16),
"4:5": (4, 5),
"1:1": (1, 1),
}
def _portrait_crop_filter(ratio: str, crop_center: float) -> str:
"""Return an ffmpeg crop= filter expression for the given portrait ratio.
Uses ffmpeg expression syntax so source dimensions are resolved at runtime.
Commas inside min()/max() are escaped with \\, to prevent ffmpeg's
filtergraph parser from treating them as filter-chain separators.
"""
num, den = _RATIOS[ratio]
cw = f"ih*{num}/{den}"
x = f"max(0\\,min((iw-{cw})*{crop_center}\\,iw-{cw}))"
return f"crop={cw}:ih:{x}:0"
def resolve_keyframe(
keyframes: list[tuple[float, float, str | None, bool, bool]],
t: float,
tolerance: float = 0.05,
) -> tuple[float, float, str | None, bool, bool] | None:
"""Return the latest keyframe at or before *t*, or None."""
result = None
for kf in keyframes:
if kf[0] <= t + tolerance:
result = kf
else:
break
return result
def apply_keyframes_to_jobs(
jobs: list[tuple[float, str, str | None, float]],
keyframes: list[tuple[float, float, str | None, bool, bool]],
base_center: float,
base_ratio: str | None,
base_rand_p: bool,
base_rand_s: bool,
) -> list[tuple[float, str, str | None, float, bool, bool]]:
"""Resolve each job's crop state from keyframes, returning widened tuples.
Returns list of (start, path, ratio, center, rand_portrait, rand_square).
"""
result = []
for s, o, _r, _c in jobs:
kf = resolve_keyframe(keyframes, s)
if kf is not None:
_, center, ratio, rp, rs = kf
else:
center, ratio, rp, rs = base_center, base_ratio, base_rand_p, base_rand_s
result.append((s, o, ratio, center, rp, rs))
return result
def _find_vaapi_device() -> str:
"""Return the first available VAAPI render device path (Linux)."""
import glob
devices = sorted(glob.glob("/dev/dri/renderD*"))
return devices[0] if devices else "/dev/dri/renderD128"
def build_ffmpeg_command(
input_path: str, start: float, output_path: str,
short_side: int | None = None,
portrait_ratio: str | None = None,
crop_center: float = 0.5,
image_sequence: bool = False,
encoder: str = "libx264",
duration: float = 8.0,
target_fps: float | None = None,
snap32: bool = False,
frames: int | None = None,
) -> list[str]:
# -ss before -i: fast input-seeking. Safe here because we always re-encode,
# so there is no keyframe-alignment issue from pre-input seek.
# Image sequences always use libwebp, so skip HW encoder setup.
use_hw_vaapi = (encoder == "h264_vaapi" and not image_sequence
and sys.platform == "linux")
cmd = [_bin("ffmpeg"), "-y"]
# VAAPI needs a render device for hardware context (Linux only).
if use_hw_vaapi:
vaapi_dev = _find_vaapi_device()
cmd += ["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi",
"-vaapi_device", vaapi_dev]
cmd += [
"-threads", "0",
"-ss", str(start),
"-i", input_path,
"-t", str(duration),
]
filters: list[str] = []
if portrait_ratio is not None:
filters.append(_portrait_crop_filter(portrait_ratio, crop_center))
if short_side is not None:
# Scale so the shorter dimension equals short_side.
filters.append(
f"scale='if(lt(iw,ih),{short_side},-2)':'if(lt(iw,ih),-2,{short_side})':flags=lanczos"
)
# LTX-2: centered crop to ÷32 (no rescale → no aspect distortion) then fps.
# Placed among CPU filters, after scale and before the VAAPI hwupload block.
if snap32:
filters.append("crop=trunc(iw/32)*32:trunc(ih/32)*32")
if target_fps is not None:
filters.append(f"fps={target_fps:g}")
# VAAPI: decoded frames are GPU surfaces. CPU filters need hwdownload first.
if use_hw_vaapi:
if filters:
filters.insert(0, "hwdownload")
filters.insert(1, "format=nv12")
filters.append("format=nv12")
filters.append("hwupload")
if filters:
cmd += ["-vf", ",".join(filters)]
# LTX-2 output rate + exact frame cap (apply to both clip and webp-seq paths).
if target_fps is not None:
cmd += ["-r", f"{target_fps:g}"]
if frames is not None:
cmd += ["-frames:v", str(frames)]
if image_sequence:
cmd += [
"-an",
"-c:v", "libwebp",
"-quality", "92",
"-compression_level", "1",
os.path.join(output_path, "frame_%04d.webp"),
]
else:
cmd += ["-c:v", encoder]
if "nvenc" in encoder:
cmd += ["-preset", "p4", "-cq", "28"]
elif "vaapi" in encoder:
cmd += ["-qp", "28"]
elif "qsv" in encoder:
cmd += ["-global_quality", "28"]
elif "amf" in encoder:
cmd += ["-qp_i", "28", "-qp_p", "28"]
cmd += ["-c:a", "pcm_s16le", output_path]
return cmd
def build_audio_extract_command(input_path: str, start: float, sequence_dir: str,
duration: float = 8.0) -> list[str]:
"""Return an ffmpeg command that extracts audio to <sequence_dir>.wav."""
audio_path = sequence_dir + ".wav"
return [
_bin("ffmpeg"), "-y",
"-ss", str(start),
"-i", input_path,
"-t", str(duration),
"-vn",
"-c:a", "pcm_s16le",
audio_path,
]
# Audio codec chosen per output extension for the manual "Extract audio area"
# tool. Empty list -> let ffmpeg pick a default encoder from the extension.
_AUDIO_CODEC_BY_EXT: dict[str, list[str]] = {
".wav": ["-c:a", "pcm_s16le"],
".flac": ["-c:a", "flac"],
".mp3": ["-c:a", "libmp3lame", "-q:a", "2"],
".m4a": ["-c:a", "aac", "-b:a", "256k"],
".aac": ["-c:a", "aac", "-b:a", "256k"],
".ogg": ["-c:a", "libvorbis", "-q:a", "5"],
".opus": ["-c:a", "libopus", "-b:a", "192k"],
}
def build_audio_clip_command(input_path: str, start: float, duration: float,
out_path: str) -> list[str]:
"""ffmpeg command to extract exactly *duration* seconds of audio starting
at *start*, re-encoded per *out_path*'s extension (wav/mp3/flac/…)."""
ext = os.path.splitext(out_path)[1].lower()
codec = _AUDIO_CODEC_BY_EXT.get(ext, [])
return [
_bin("ffmpeg"), "-y",
"-ss", str(start),
"-i", input_path,
"-t", str(duration),
"-vn",
*codec,
out_path,
]
def detect_hw_encoders() -> list[str]:
"""Probe ffmpeg for available H.264 hardware encoders.
Returns only encoders relevant to the current platform:
- Windows: h264_nvenc, h264_qsv, h264_amf
- Linux: h264_nvenc, h264_vaapi, h264_qsv
- macOS: h264_videotoolbox
"""
if sys.platform == "win32":
candidates = ["h264_nvenc", "h264_qsv", "h264_amf"]
elif sys.platform == "darwin":
candidates = ["h264_videotoolbox"]
else:
candidates = ["h264_nvenc", "h264_vaapi", "h264_qsv"]
try:
result = subprocess.run(
[_bin("ffmpeg"), "-hide_banner", "-encoders"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0:
return []
output = result.stdout
except Exception:
return []
available = [enc for enc in candidates if re.search(rf'\b{enc}\b', output)]
if available:
_log(f"HW encoders detected: {', '.join(available)}")
else:
_log("No HW encoders detected — GPU export unavailable")
return available