12ed183f1b
- Remove legacy distance-mode scanning (build_profile, _similarity, etc.) and hand-crafted intensity features — pipeline is now embedding-only - Integrate Microsoft BEATs as embedding option alongside wav2vec2/HuBERT - Add TrainDialog with positive class selector, model picker, video dir fallback, and live training stats - Add TrainWorker QThread with cancel support and proper lifecycle cleanup - Add source_path column to DB for robust source video tracking - Add get_export_folders/get_training_data/get_training_stats to DB - Wire source_path in all export DB writes (_on_clip_done, _on_auto_clip_done) - Cancel scan/train workers in closeEvent to prevent use-after-free crashes - Add setup_env.sh supporting both conda and python venv (CUDA 12.8) - Update requirements.txt with all actual dependencies - Update 8cut_train.py with --positive flag for new DB-driven training Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
256 lines
10 KiB
Python
256 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Calibration — per-video normalized features + classifier."""
|
|
import sys, os, time, warnings
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
warnings.filterwarnings("ignore")
|
|
|
|
import numpy as np
|
|
import librosa
|
|
from sklearn.ensemble import GradientBoostingClassifier
|
|
|
|
from core.audio_scan import _SR, _WINDOW
|
|
|
|
_HOP_LENGTH = 1024
|
|
_N_FFT = 2048
|
|
from core.db import ProcessedDB
|
|
|
|
PLEX_DIR = "/media/unraid/appdata/plex/download/porn_jav/"
|
|
PROFILE_NAME = "JAV_missionary"
|
|
TOLERANCE = 12.0
|
|
NEG_MARGIN = 120.0
|
|
|
|
|
|
def extract_rich_features(y, sr=_SR):
|
|
"""Per-frame features: onset, energy, spectral shape, mel bands (22 features)."""
|
|
hop = _HOP_LENGTH
|
|
S = np.abs(librosa.stft(y, n_fft=_N_FFT, hop_length=hop)) ** 2
|
|
rms = librosa.feature.rms(S=S, hop_length=hop)
|
|
cent = librosa.feature.spectral_centroid(S=S, sr=sr)
|
|
bw = librosa.feature.spectral_bandwidth(S=S, sr=sr)
|
|
rolloff = librosa.feature.spectral_rolloff(S=S, sr=sr)
|
|
flatness = librosa.feature.spectral_flatness(S=S)
|
|
zcr = librosa.feature.zero_crossing_rate(y, hop_length=hop)
|
|
onset = librosa.onset.onset_strength(S=librosa.power_to_db(S), sr=sr, hop_length=hop).reshape(1, -1)
|
|
|
|
mel_S = librosa.feature.melspectrogram(S=S, sr=sr, hop_length=hop, n_mels=128)
|
|
mel_freqs = librosa.mel_frequencies(n_mels=128, fmin=0, fmax=sr/2)
|
|
bands = [(0, 100), (100, 300), (300, 600), (600, 1200),
|
|
(1200, 2000), (2000, 3500), (3500, 5500), (5500, 8000)]
|
|
band_feats = []
|
|
for flo, fhi in bands:
|
|
mask = (mel_freqs >= flo) & (mel_freqs < fhi)
|
|
if mask.sum() > 0:
|
|
band_feats.append(librosa.power_to_db(mel_S[mask].mean(axis=0, keepdims=True) + 1e-10))
|
|
else:
|
|
band_feats.append(np.zeros((1, mel_S.shape[1])))
|
|
|
|
sc = librosa.feature.spectral_contrast(S=S, sr=sr, hop_length=hop)
|
|
|
|
min_t = min(rms.shape[1], cent.shape[1], onset.shape[1], sc.shape[1],
|
|
band_feats[0].shape[1])
|
|
return np.vstack([
|
|
rms[:, :min_t], cent[:, :min_t], bw[:, :min_t], rolloff[:, :min_t],
|
|
flatness[:, :min_t], zcr[:, :min_t], onset[:, :min_t],
|
|
] + [b[:, :min_t] for b in band_feats]
|
|
+ [sc[:, :min_t]])
|
|
|
|
|
|
def compute_window_stats(feat, hop=1.0):
|
|
"""Sliding window mean/std → (timestamps, feature_vectors)."""
|
|
n_feats, T = feat.shape
|
|
fps = _SR / _HOP_LENGTH
|
|
win_frames = int(_WINDOW * fps)
|
|
hop_frames = int(hop * fps)
|
|
if win_frames > T:
|
|
return np.array([]), np.array([])
|
|
|
|
cumsum = np.zeros((n_feats, T + 1))
|
|
cumsum[:, 1:] = np.cumsum(feat, axis=1)
|
|
cumsq = np.zeros((n_feats, T + 1))
|
|
cumsq[:, 1:] = np.cumsum(feat ** 2, axis=1)
|
|
|
|
starts = np.arange(0, T - win_frames + 1, hop_frames)
|
|
ends = starts + win_frames
|
|
sums = cumsum[:, ends] - cumsum[:, starts]
|
|
sq_sums = cumsq[:, ends] - cumsq[:, starts]
|
|
means = sums / win_frames
|
|
stds = np.sqrt(np.maximum(sq_sums / win_frames - means ** 2, 0) + 1e-10)
|
|
|
|
return starts / fps, np.vstack([means, stds]).T
|
|
|
|
|
|
def label_windows(timestamps, gt_intense, gt_soft):
|
|
all_gt = list(gt_intense) + list(gt_soft)
|
|
labels = np.zeros(len(timestamps), dtype=int)
|
|
for i, t in enumerate(timestamps):
|
|
di = min((abs(t - g) for g in gt_intense), default=9999)
|
|
da = min((abs(t - g) for g in all_gt), default=9999)
|
|
if di < TOLERANCE:
|
|
labels[i] = 1
|
|
elif da > NEG_MARGIN:
|
|
labels[i] = -1
|
|
return labels
|
|
|
|
|
|
def main():
|
|
db = ProcessedDB()
|
|
rows = db._con.execute(
|
|
"SELECT filename, start_time, output_path FROM processed WHERE profile = ?",
|
|
(PROFILE_NAME,),
|
|
).fetchall()
|
|
|
|
intense_by_video, soft_by_video = {}, {}
|
|
for fn, st, op in rows:
|
|
if '/mp4_Intense/' in op:
|
|
intense_by_video.setdefault(fn, set()).add(st)
|
|
elif '/mp4_Soft/' in op:
|
|
soft_by_video.setdefault(fn, set()).add(st)
|
|
|
|
videos = [fn for fn in intense_by_video
|
|
if os.path.exists(os.path.join(PLEX_DIR, fn))]
|
|
n_vids = int(sys.argv[1]) if len(sys.argv) > 1 else len(videos)
|
|
videos = videos[:n_vids]
|
|
print(f"Processing {len(videos)} videos...")
|
|
|
|
all_data_raw = [] # raw features
|
|
all_data_norm = [] # per-video z-scored features
|
|
|
|
for vi, vname in enumerate(videos):
|
|
vpath = os.path.join(PLEX_DIR, vname)
|
|
gt_intense = sorted(intense_by_video.get(vname, set()))
|
|
gt_soft = sorted(soft_by_video.get(vname, set()))
|
|
|
|
t0 = time.time()
|
|
y, _ = librosa.load(vpath, sr=_SR, mono=True)
|
|
feat = extract_rich_features(y)
|
|
timestamps, window_vectors = compute_window_stats(feat, hop=1.0)
|
|
dt = time.time() - t0
|
|
|
|
if len(timestamps) == 0:
|
|
continue
|
|
|
|
labels = label_windows(timestamps, gt_intense, gt_soft)
|
|
|
|
# Per-video z-score normalization
|
|
vid_mean = window_vectors.mean(axis=0)
|
|
vid_std = window_vectors.std(axis=0)
|
|
vid_std = np.maximum(vid_std, 1e-6)
|
|
normed = (window_vectors - vid_mean) / vid_std
|
|
|
|
n_pos = (labels == 1).sum()
|
|
n_neg = (labels == -1).sum()
|
|
print(f" [{vi+1}/{len(videos)}] {vname[:55]} pos={n_pos} neg={n_neg} ({dt:.1f}s)")
|
|
|
|
all_data_raw.append((vi, vname, timestamps, window_vectors, labels))
|
|
all_data_norm.append((vi, vname, timestamps, normed, labels))
|
|
|
|
# Run CV for both raw and normalized
|
|
for label, data in [("RAW features", all_data_raw),
|
|
("PER-VIDEO NORMALIZED features", all_data_norm)]:
|
|
print(f"\n{'='*70}")
|
|
print(f" {label}")
|
|
print(f"{'='*70}")
|
|
|
|
all_y_true, all_y_prob = [], []
|
|
|
|
for test_idx in range(len(data)):
|
|
_, vname, _, test_X, test_labels = data[test_idx]
|
|
test_mask = test_labels != 0
|
|
if test_mask.sum() == 0 or (test_labels[test_mask] == 1).sum() == 0:
|
|
continue
|
|
X_test = test_X[test_mask]
|
|
y_test = (test_labels[test_mask] == 1).astype(int)
|
|
|
|
X_parts, y_parts = [], []
|
|
for i, (_, _, _, feats, labs) in enumerate(data):
|
|
if i == test_idx:
|
|
continue
|
|
m = labs != 0
|
|
if m.sum() == 0:
|
|
continue
|
|
X_parts.append(feats[m])
|
|
y_parts.append((labs[m] == 1).astype(int))
|
|
|
|
if not X_parts:
|
|
continue
|
|
X_train = np.vstack(X_parts)
|
|
y_train = np.concatenate(y_parts)
|
|
|
|
pos_idx = np.where(y_train == 1)[0]
|
|
neg_idx = np.where(y_train == 0)[0]
|
|
if len(pos_idx) == 0 or len(neg_idx) == 0:
|
|
continue
|
|
rng = np.random.RandomState(42)
|
|
n_neg = min(len(neg_idx), len(pos_idx) * 3)
|
|
neg_sample = rng.choice(neg_idx, n_neg, replace=False)
|
|
train_idx = np.concatenate([pos_idx, neg_sample])
|
|
|
|
clf = GradientBoostingClassifier(
|
|
n_estimators=200, max_depth=5, learning_rate=0.1, random_state=42
|
|
)
|
|
clf.fit(X_train[train_idx], y_train[train_idx])
|
|
probs = clf.predict_proba(X_test)[:, 1]
|
|
|
|
tp = ((probs >= 0.5) & (y_test == 1)).sum()
|
|
fp = ((probs >= 0.5) & (y_test == 0)).sum()
|
|
fn_count = ((probs < 0.5) & (y_test == 1)).sum()
|
|
pos_s = probs[y_test == 1].mean() if (y_test == 1).sum() > 0 else 0
|
|
neg_s = probs[y_test == 0].mean() if (y_test == 0).sum() > 0 else 0
|
|
print(f" {vname[:50]:50s} TP={tp:3d} FP={fp:4d} FN={fn_count:3d} pos_p={pos_s:.3f} neg_p={neg_s:.3f}")
|
|
|
|
all_y_true.extend(y_test)
|
|
all_y_prob.extend(probs)
|
|
|
|
if not all_y_true:
|
|
print(" No test results.")
|
|
continue
|
|
|
|
y_true = np.array(all_y_true)
|
|
y_prob = np.array(all_y_prob)
|
|
pos_probs = y_prob[y_true == 1]
|
|
neg_probs = y_prob[y_true == 0]
|
|
|
|
if len(pos_probs) > 0 and len(neg_probs) > 0:
|
|
print(f"\n POS: 25%={np.percentile(pos_probs,25):.3f} 50%={np.percentile(pos_probs,50):.3f}"
|
|
f" 75%={np.percentile(pos_probs,75):.3f} max={pos_probs.max():.3f}")
|
|
print(f" NEG: 25%={np.percentile(neg_probs,25):.3f} 50%={np.percentile(neg_probs,50):.3f}"
|
|
f" 75%={np.percentile(neg_probs,75):.3f} max={neg_probs.max():.3f}")
|
|
|
|
best_f1, best_thr = 0, 0
|
|
print(f"\n {'thr':>5} {'prec':>6} {'recall':>6} {'TP':>5} {'FP':>5} {'FN':>4} {'F1':>6}")
|
|
for thr in np.arange(0.10, 0.91, 0.05):
|
|
tp = ((y_prob >= thr) & (y_true == 1)).sum()
|
|
fp = ((y_prob >= thr) & (y_true == 0)).sum()
|
|
fn_count = ((y_prob < thr) & (y_true == 1)).sum()
|
|
prec = tp / (tp + fp) if (tp + fp) > 0 else 0
|
|
rec = tp / (tp + fn_count) if (tp + fn_count) > 0 else 0
|
|
f1 = 2 * prec * rec / (prec + rec) if (prec + rec) > 0 else 0
|
|
if f1 > best_f1:
|
|
best_f1, best_thr = f1, thr
|
|
print(f" {thr:.2f} {prec:.4f} {rec:.4f} {tp:5d} {fp:5d} {fn_count:4d} {f1:.4f}")
|
|
print(f"\n Best F1={best_f1:.4f} at thr={best_thr:.2f}")
|
|
|
|
# Feature importance
|
|
X_all = np.vstack([f[l != 0] for _, _, _, f, l in data])
|
|
y_all = np.concatenate([(l[l != 0] == 1).astype(int) for _, _, _, _, l in data])
|
|
pos_idx = np.where(y_all == 1)[0]
|
|
neg_idx = np.where(y_all == 0)[0]
|
|
rng = np.random.RandomState(42)
|
|
neg_sub = rng.choice(neg_idx, min(len(neg_idx), len(pos_idx)*3), replace=False)
|
|
clf = GradientBoostingClassifier(n_estimators=200, max_depth=5, learning_rate=0.1, random_state=42)
|
|
clf.fit(X_all[np.concatenate([pos_idx, neg_sub])], y_all[np.concatenate([pos_idx, neg_sub])])
|
|
|
|
feat_names = (
|
|
["rms", "centroid", "bw", "rolloff", "flat", "zcr", "onset"]
|
|
+ [f"mel{i}" for i in range(8)]
|
|
+ [f"sc{i}" for i in range(7)]
|
|
)
|
|
stat_names = [f"{f}_m" for f in feat_names] + [f"{f}_s" for f in feat_names]
|
|
imp = clf.feature_importances_
|
|
top = sorted(zip(stat_names, imp), key=lambda x: -x[1])[:10]
|
|
print(f" Top features: {', '.join(f'{n}={v:.3f}' for n, v in top)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|