feat: integrate training UI, BEATs model, and clean up legacy code

- Remove legacy distance-mode scanning (build_profile, _similarity, etc.)
  and hand-crafted intensity features — pipeline is now embedding-only
- Integrate Microsoft BEATs as embedding option alongside wav2vec2/HuBERT
- Add TrainDialog with positive class selector, model picker, video dir
  fallback, and live training stats
- Add TrainWorker QThread with cancel support and proper lifecycle cleanup
- Add source_path column to DB for robust source video tracking
- Add get_export_folders/get_training_data/get_training_stats to DB
- Wire source_path in all export DB writes (_on_clip_done, _on_auto_clip_done)
- Cancel scan/train workers in closeEvent to prevent use-after-free crashes
- Add setup_env.sh supporting both conda and python venv (CUDA 12.8)
- Update requirements.txt with all actual dependencies
- Update 8cut_train.py with --positive flag for new DB-driven training

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 11:52:27 +02:00
parent f2c38aee79
commit 12ed183f1b
11 changed files with 2608 additions and 338 deletions
+347 -128
View File
@@ -1,105 +1,359 @@
"""Audio similarity scanning — MFCC + spectral contrast profile matching."""
"""Audio scanning — embedding-based classifier for audio event detection."""
import hashlib
import os
import numpy as np
import librosa
from .paths import _log
_N_MFCC = 13 # coefficients 0-12; we drop C0 → 12 usable
_SR = 16000 # lower sr = faster, no quality loss for style matching
_HOP_LENGTH = 1024 # STFT hop (~64ms frames at 16kHz)
_N_FFT = 2048 # STFT window
_SR = 16000 # lower sr = faster
_WINDOW = 8.0 # seconds
_N_FEATURES = 62 # (12 mfcc + 12 delta + 7 sc) * 2 (mean + std)
_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".8cut_models")
_W2V_CACHE_DIR = os.path.join(os.path.expanduser("~"), ".8cut_cache", "w2v")
# ---------------------------------------------------------------------------
# Embedding extraction (lazy-loaded)
# ---------------------------------------------------------------------------
_w2v_model = None
_w2v_device = None
_w2v_model_name = None
# Supported embedding models — name → embed_dim
_EMBED_MODELS = {
"WAV2VEC2_BASE": 768,
"WAV2VEC2_LARGE": 1024,
"WAV2VEC2_LARGE_LV60K":1024,
"HUBERT_BASE": 768,
"HUBERT_LARGE": 1024,
"HUBERT_XLARGE": 1280,
"BEATS": 768,
}
_DEFAULT_EMBED_MODEL = "WAV2VEC2_BASE"
_BEATS_CHECKPOINT = os.path.join(
os.path.expanduser("~"), ".cache", "huggingface", "hub",
"models--lpepino--beats_ckpts", "snapshots",
"5b53b0404df452a3a607d7e67687227730e5bad1", "BEATs_iter3_plus_AS2M.pt",
)
def _extract_features_from_signal(y: np.ndarray, sr: int = _SR) -> np.ndarray:
"""Compute feature matrix (31 x T) from a raw audio signal.
def _get_w2v_model(model_name: str | None = None):
"""Lazy-load an embedding model. Reloads if model_name differs from cached."""
global _w2v_model, _w2v_device, _w2v_model_name
if model_name is None:
model_name = _DEFAULT_EMBED_MODEL
if _w2v_model is None or _w2v_model_name != model_name:
import torch
_w2v_device = "cuda" if torch.cuda.is_available() else "cpu"
Features per frame: 12 MFCCs (skip C0) + 12 delta MFCCs + 7 spectral contrast.
if model_name == "BEATS":
from .beats_model import BEATs, BEATsConfig
checkpoint = torch.load(_BEATS_CHECKPOINT, map_location=_w2v_device,
weights_only=False)
cfg = BEATsConfig(checkpoint['cfg'])
_w2v_model = BEATs(cfg)
_w2v_model.load_state_dict(checkpoint['model'])
_w2v_model.to(_w2v_device)
else:
import torchaudio
bundle = getattr(torchaudio.pipelines, model_name)
_w2v_model = bundle.get_model().to(_w2v_device)
_w2v_model.eval()
_w2v_model_name = model_name
_log(f"audio_scan: {model_name} loaded on {_w2v_device}")
return _w2v_model, _w2v_device
def _embed_dim(model_name: str | None = None) -> int:
"""Return embedding dimension for a model name."""
if model_name is None:
model_name = _DEFAULT_EMBED_MODEL
return _EMBED_MODELS.get(model_name, 768)
def _w2v_cache_path(video_path: str, hop: float, window: float,
model_name: str | None = None) -> str:
"""Return cache file path for a video's embeddings (includes model name)."""
if model_name is None:
model_name = _DEFAULT_EMBED_MODEL
abspath = os.path.abspath(video_path)
mtime = os.path.getmtime(abspath)
key = f"{abspath}|{mtime}|{hop}|{window}|{model_name}"
h = hashlib.sha256(key.encode()).hexdigest()[:16]
return os.path.join(_W2V_CACHE_DIR, f"{h}.npz")
def _extract_w2v_windows(y: np.ndarray, sr: int = _SR,
hop: float = 1.0, window: float = _WINDOW,
video_path: str | None = None,
cancel_flag: object = None,
model_name: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Extract embeddings for all sliding windows using a torchaudio model.
If video_path is given, results are cached to disk for fast re-scans.
Returns (timestamps, embeddings) where embeddings is (N, D).
"""
S = np.abs(librosa.stft(y, n_fft=_N_FFT, hop_length=_HOP_LENGTH)) ** 2
mel_S = librosa.feature.melspectrogram(S=S, sr=sr, hop_length=_HOP_LENGTH)
mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel_S), sr=sr, n_mfcc=_N_MFCC)
mfcc = mfcc[1:] # drop C0 (energy) — dominates cosine sim, kills discrimination
delta = librosa.feature.delta(mfcc)
sc = librosa.feature.spectral_contrast(S=S, sr=sr, hop_length=_HOP_LENGTH)
return np.vstack([mfcc, delta, sc]) # (31, T)
edim = _embed_dim(model_name)
def _aggregate(feature_matrix: np.ndarray) -> np.ndarray:
"""Collapse a (31, T) feature matrix into a (62,) vector via mean + std."""
return np.concatenate([
feature_matrix.mean(axis=1),
feature_matrix.std(axis=1),
])
def _extract_features(path: str, sr: int = _SR) -> np.ndarray:
"""Load audio from a file and return a 62-dim feature vector."""
y, _ = librosa.load(path, sr=sr, mono=True)
feat = _extract_features_from_signal(y, sr)
return _aggregate(feat)
def build_profile(clip_paths: list[str]) -> dict | None:
"""Extract features from reference clips.
Returns dict with:
- mean_vector: averaged feature vector across all clips (62,)
- clip_vectors: list of individual feature vectors
Returns None if no clips could be loaded.
"""
vectors = []
for p in clip_paths:
# Try loading from cache
cache_file = None
if video_path:
try:
vec = _extract_features(p)
vectors.append(vec)
cache_file = _w2v_cache_path(video_path, hop, window, model_name)
if os.path.exists(cache_file):
data = np.load(cache_file)
_log(f"audio_scan: cache hit ({cache_file})")
return data["timestamps"], data["embeddings"]
except Exception as e:
_log(f"audio_scan: skip {p}: {e}")
if not vectors:
return None
arr = np.stack(vectors)
return {
"mean_vector": arr.mean(axis=0),
"clip_vectors": vectors,
}
_log(f"audio_scan: cache read failed: {e}")
win_samples = int(window * sr)
hop_samples = int(hop * sr)
n_windows = max(0, (len(y) - win_samples) // hop_samples + 1)
if n_windows == 0:
return np.array([]), np.empty((0, edim))
import torch
model, device = _get_w2v_model(model_name)
is_beats = (model_name or _DEFAULT_EMBED_MODEL) == "BEATS"
batch_size = 16
timestamps = np.arange(n_windows) * hop
embeddings = []
for batch_start in range(0, n_windows, batch_size):
if cancel_flag and getattr(cancel_flag, '_cancel', False):
return np.array([]), np.empty((0, edim))
batch_end = min(batch_start + batch_size, n_windows)
chunks = []
for i in range(batch_start, batch_end):
start = i * hop_samples
chunks.append(y[start:start + win_samples])
with torch.no_grad():
waveforms = torch.from_numpy(np.stack(chunks)).float().to(device)
if is_beats:
padding_mask = torch.zeros_like(waveforms, dtype=torch.bool)
features, _ = model.extract_features(waveforms, padding_mask=padding_mask)
else:
features, _ = model(waveforms)
batch_emb = features.mean(dim=1).cpu().numpy()
embeddings.append(batch_emb)
result_ts = timestamps
result_emb = np.vstack(embeddings)
# Save to cache
if cache_file:
try:
os.makedirs(_W2V_CACHE_DIR, exist_ok=True)
np.savez(cache_file, timestamps=result_ts, embeddings=result_emb)
_log(f"audio_scan: w2v cache saved ({cache_file})")
except Exception as e:
_log(f"audio_scan: cache write failed: {e}")
return result_ts, result_emb
def _similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Euclidean-distance-based similarity in (0, 1].
def _extract_w2v_targeted(y: np.ndarray, sr: int, gt_intense: list[float],
gt_soft: list[float], tolerance: float = 12.0,
neg_margin: float = 120.0,
model_name: str | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Extract embeddings only near positives and distant negatives.
1/(1+dist): identical → 1.0, very different → near 0.
Returns (timestamps, embeddings, labels) where labels: 1=pos, -1=neg, 0=ambig.
"""
return float(1.0 / (1.0 + np.linalg.norm(a - b)))
edim = _embed_dim(model_name)
duration = len(y) / sr
win_samples = int(_WINDOW * sr)
all_gt = list(gt_intense) + list(gt_soft)
# Positive windows: every second near intense markers
pos_times = set()
for gt in gt_intense:
for offset in range(-int(tolerance), int(tolerance) + 1):
t = gt + offset
if 0 <= t <= duration - _WINDOW:
pos_times.add(int(t))
# Negative windows: every 4s, far from any marker
neg_times = set()
for t in range(0, int(duration - _WINDOW), 4):
if min((abs(t - g) for g in all_gt), default=9999) > neg_margin:
neg_times.add(t)
all_times = sorted(pos_times | neg_times)
# Filter out windows that go past the end
valid_times = [t for t in all_times if int(t * sr) + win_samples <= len(y)]
if not valid_times:
return np.array([]), np.zeros((0, edim)), np.array([], dtype=int)
import torch
model, device = _get_w2v_model(model_name)
batch_size = 16
timestamps_list: list[float] = []
embeddings_list: list[np.ndarray] = []
is_beats = (model_name or _DEFAULT_EMBED_MODEL) == "BEATS"
for batch_start in range(0, len(valid_times), batch_size):
batch_end = min(batch_start + batch_size, len(valid_times))
chunks = []
for t in valid_times[batch_start:batch_end]:
start = int(t * sr)
chunks.append(y[start:start + win_samples])
timestamps_list.append(float(t))
with torch.no_grad():
waveforms = torch.from_numpy(np.stack(chunks)).float().to(device)
if is_beats:
padding_mask = torch.zeros_like(waveforms, dtype=torch.bool)
features, _ = model.extract_features(waveforms, padding_mask=padding_mask)
else:
features, _ = model(waveforms)
batch_emb = features.mean(dim=1).cpu().numpy()
embeddings_list.append(batch_emb)
timestamps = np.array(timestamps_list)
embeddings = np.vstack(embeddings_list)
labels = np.zeros(len(timestamps), dtype=int)
for i, t in enumerate(timestamps):
di = min((abs(t - g) for g in gt_intense), default=9999)
da = min((abs(t - g) for g in all_gt), default=9999)
if di < tolerance:
labels[i] = 1
elif da > neg_margin:
labels[i] = -1
return timestamps, embeddings, labels
# ---------------------------------------------------------------------------
# Classifier mode — train / save / load / scan
# ---------------------------------------------------------------------------
def train_classifier(video_infos: list[tuple[str, list[float], list[float]]],
model_path: str | None = None,
tolerance: float = 12.0,
neg_margin: float = 120.0,
embed_model: str | None = None) -> dict:
"""Train a classifier from labeled videos.
Args:
video_infos: list of (video_path, intense_times, soft_times)
model_path: if given, save model to this path
tolerance/neg_margin: labeling parameters
embed_model: embedding model name (e.g. "HUBERT_BASE", "BEATS"), defaults to WAV2VEC2_BASE
Returns:
dict with 'classifier', 'embed_model', and metadata, or None on failure.
"""
from sklearn.ensemble import GradientBoostingClassifier
all_X, all_y = [], []
for vi, (vpath, gt_intense, gt_soft) in enumerate(video_infos):
_log(f"audio_scan: training [{vi+1}/{len(video_infos)}] {os.path.basename(vpath)}")
y, _ = librosa.load(vpath, sr=_SR, mono=True)
timestamps, embeddings, labels = _extract_w2v_targeted(
y, _SR, gt_intense, gt_soft, tolerance, neg_margin,
model_name=embed_model,
)
if len(timestamps) == 0:
continue
# Per-video z-score normalize
vid_mean = embeddings.mean(axis=0)
vid_std = np.maximum(embeddings.std(axis=0), 1e-6)
normed = (embeddings - vid_mean) / vid_std
for i in range(len(labels)):
if labels[i] == 1:
all_X.append(normed[i])
all_y.append(1)
elif labels[i] == -1:
all_X.append(normed[i])
all_y.append(0)
if not all_X:
_log("audio_scan: no training samples collected")
return None
X = np.stack(all_X)
y_arr = np.array(all_y)
n_pos = (y_arr == 1).sum()
n_neg = (y_arr == 0).sum()
_log(f"audio_scan: training set — {n_pos} positive, {n_neg} negative")
if n_pos == 0 or n_neg == 0:
_log(f"audio_scan: need both classes — {n_pos} pos, {n_neg} neg")
return None
# Subsample negatives for balance
rng = np.random.RandomState(42)
pos_idx = np.where(y_arr == 1)[0]
neg_idx = np.where(y_arr == 0)[0]
n_neg_sample = min(len(neg_idx), len(pos_idx) * 3)
neg_sample = rng.choice(neg_idx, n_neg_sample, replace=False)
train_idx = np.concatenate([pos_idx, neg_sample])
rng.shuffle(train_idx)
clf = GradientBoostingClassifier(
n_estimators=200, max_depth=5, learning_rate=0.1, random_state=42,
)
clf.fit(X[train_idx], y_arr[train_idx])
_log("audio_scan: classifier trained")
model = {"classifier": clf, "n_features": X.shape[1],
"embed_model": embed_model or _DEFAULT_EMBED_MODEL}
if model_path:
import joblib
parent = os.path.dirname(model_path)
if parent:
os.makedirs(parent, exist_ok=True)
joblib.dump(model, model_path)
_log(f"audio_scan: model saved to {model_path}")
return model
def load_classifier(model_path: str) -> dict | None:
"""Load a saved classifier model."""
if not os.path.exists(model_path):
return None
import joblib
return joblib.load(model_path)
def default_model_path(profile_name: str = "default") -> str:
"""Return the default path for a profile's classifier model."""
return os.path.join(_MODEL_DIR, f"{profile_name}.joblib")
# ---------------------------------------------------------------------------
# Scanning
# ---------------------------------------------------------------------------
def scan_video(
video_path: str,
profile: dict,
mode: str = "average",
threshold: float = 0.05,
model: dict = None,
threshold: float = 0.30,
hop: float = 1.0,
window: float = _WINDOW,
cancel_flag: object = None,
) -> list[tuple[float, float, float]]:
"""Slide a window across the video audio and score against the profile.
"""Scan a video for matching audio regions using a trained classifier.
Pre-computes STFT once for the whole file, then uses vectorized
cumulative-sum sliding window for speed.
Args:
video_path: path to video/audio file
profile: dict from build_profile()
mode: "average" (compare to mean) or "nearest" (max over all clips)
threshold: minimum similarity to include (0-1, default 0.05)
hop: step size in seconds
window: window size in seconds (default 8s)
cancel_flag: object with _cancel bool attribute; checked periodically
Returns:
list of (start_time, end_time, score) for regions above threshold
Returns list of (start_time, end_time, score) above threshold.
"""
if model is None:
_log("audio_scan: no model provided")
return []
_log(f"audio_scan: loading {video_path}")
y, sr = librosa.load(video_path, sr=_SR, mono=True)
duration = len(y) / sr
@@ -108,68 +362,33 @@ def scan_video(
if cancel_flag and getattr(cancel_flag, '_cancel', False):
return []
# Compute features for the entire file at once (one STFT)
feat = _extract_features_from_signal(y, sr) # (31, T)
n_feats, T = feat.shape
fps = sr / _HOP_LENGTH # frames per second
win_frames = int(window * fps)
hop_frames = int(hop * fps)
clf = model["classifier"]
embed_model = model.get("embed_model")
if win_frames > T:
_log(f"audio_scan: extracting embeddings ({embed_model or 'default'})...")
timestamps, window_vectors = _extract_w2v_windows(
y, sr, hop=hop, window=window, video_path=video_path,
cancel_flag=cancel_flag, model_name=embed_model,
)
if len(timestamps) == 0:
_log("audio_scan: video shorter than window")
return []
_log(f"audio_scan: scanning {T} frames, win={win_frames}, hop={hop_frames}")
# Per-video z-score normalize
vid_mean = window_vectors.mean(axis=0)
vid_std = np.maximum(window_vectors.std(axis=0), 1e-6)
normed = (window_vectors - vid_mean) / vid_std
# Vectorized sliding window via cumulative sums
cumsum = np.zeros((n_feats, T + 1))
cumsum[:, 1:] = np.cumsum(feat, axis=1)
cumsq = np.zeros((n_feats, T + 1))
cumsq[:, 1:] = np.cumsum(feat ** 2, axis=1)
starts = np.arange(0, T - win_frames + 1, hop_frames)
ends = starts + win_frames
sums = cumsum[:, ends] - cumsum[:, starts] # (31, n_windows)
sq_sums = cumsq[:, ends] - cumsq[:, starts]
means = sums / win_frames
stds = np.sqrt(np.maximum(sq_sums / win_frames - means ** 2, 0) + 1e-10)
window_vectors = np.vstack([means, stds]).T # (n_windows, 62)
_log(f"audio_scan: classifying {len(normed)} windows...")
if cancel_flag and getattr(cancel_flag, '_cancel', False):
return []
# Score all windows
if mode == "nearest":
# Compare each window to every clip vector, take max
clip_vecs = np.stack(profile["clip_vectors"]) # (n_clips, 62)
results = []
# Process in batches to check cancel_flag periodically
batch = 500
for i in range(0, len(window_vectors), batch):
if cancel_flag and getattr(cancel_flag, '_cancel', False):
_log("audio_scan: cancelled")
return results
chunk = window_vectors[i:i + batch]
# cdist: (batch, n_clips) distances
dists = np.linalg.norm(chunk[:, None, :] - clip_vecs[None, :, :], axis=2)
scores = 1.0 / (1.0 + dists.min(axis=1)) # min dist = max similarity
for j, score in enumerate(scores):
if score >= threshold:
idx = i + j
start_t = starts[idx] / fps
results.append((start_t, start_t + window, float(score)))
else:
# Average mode: compare to mean vector
ref = profile["mean_vector"]
dists = np.linalg.norm(window_vectors - ref, axis=1)
scores = 1.0 / (1.0 + dists)
mask = scores >= threshold
results = [
(starts[i] / fps, starts[i] / fps + window, float(scores[i]))
for i in np.nonzero(mask)[0]
]
probs = clf.predict_proba(normed)[:, 1]
mask = probs >= threshold
results = [
(timestamps[i], timestamps[i] + window, float(probs[i]))
for i in np.nonzero(mask)[0]
]
_log(f"audio_scan: {len(results)} regions above threshold {threshold}")
return results
+783
View File
@@ -0,0 +1,783 @@
# --------------------------------------------------------
# BEATs: Audio Pre-Training with Acoustic Tokenizers (https://arxiv.org/abs/2212.09058)
# Github source: https://github.com/microsoft/unilm/tree/master/beats
# Copyright (c) 2022 Microsoft
# Licensed under The MIT License [see LICENSE for details]
# Based on fairseq code bases
# https://github.com/pytorch/fairseq
# --------------------------------------------------------
import math
import numpy as np
from typing import Dict, Optional, Tuple
import torch
from torch import Tensor, nn
import torch.nn.functional as F
from torch.nn import LayerNorm, Parameter
from .beats_modules import (
GradMultiply,
SamePad,
get_activation_fn,
GLU_Linear,
quant_noise,
)
class TransformerEncoder(nn.Module):
def __init__(self, args):
super().__init__()
self.dropout = args.dropout
self.embedding_dim = args.encoder_embed_dim
self.pos_conv = nn.Conv1d(
self.embedding_dim,
self.embedding_dim,
kernel_size=args.conv_pos,
padding=args.conv_pos // 2,
groups=args.conv_pos_groups,
)
dropout = 0
std = math.sqrt((4 * (1.0 - dropout)) / (args.conv_pos * self.embedding_dim))
nn.init.normal_(self.pos_conv.weight, mean=0, std=std)
nn.init.constant_(self.pos_conv.bias, 0)
self.pos_conv = nn.utils.weight_norm(self.pos_conv, name="weight", dim=2)
self.pos_conv = nn.Sequential(self.pos_conv, SamePad(args.conv_pos), nn.GELU())
if hasattr(args, "relative_position_embedding"):
self.relative_position_embedding = args.relative_position_embedding
self.num_buckets = args.num_buckets
self.max_distance = args.max_distance
else:
self.relative_position_embedding = False
self.num_buckets = 0
self.max_distance = 0
self.layers = nn.ModuleList(
[
TransformerSentenceEncoderLayer(
embedding_dim=self.embedding_dim,
ffn_embedding_dim=args.encoder_ffn_embed_dim,
num_attention_heads=args.encoder_attention_heads,
dropout=self.dropout,
attention_dropout=args.attention_dropout,
activation_dropout=args.activation_dropout,
activation_fn=args.activation_fn,
layer_norm_first=args.layer_norm_first,
deep_norm=args.deep_norm,
has_relative_attention_bias=self.relative_position_embedding,
num_buckets=self.num_buckets,
max_distance=self.max_distance,
gru_rel_pos=args.gru_rel_pos,
encoder_layers=args.encoder_layers,
)
for i in range(args.encoder_layers)
]
)
if self.relative_position_embedding:
for i in range(1, args.encoder_layers):
del self.layers[i].self_attn.relative_attention_bias
self.layers[i].self_attn.relative_attention_bias = self.layers[0].self_attn.relative_attention_bias
self.layer_norm_first = args.layer_norm_first
self.layer_norm = LayerNorm(self.embedding_dim)
self.layerdrop = args.encoder_layerdrop
self.apply(init_bert_params)
if args.deep_norm:
deep_norm_beta = math.pow(8 * args.encoder_layers, -1 / 4)
for i in range(args.encoder_layers):
nn.init.xavier_normal_(self.layers[i].self_attn.k_proj.weight, gain=1)
nn.init.xavier_normal_(self.layers[i].self_attn.v_proj.weight, gain=deep_norm_beta)
nn.init.xavier_normal_(self.layers[i].self_attn.q_proj.weight, gain=1)
nn.init.xavier_normal_(self.layers[i].self_attn.out_proj.weight, gain=deep_norm_beta)
nn.init.xavier_normal_(self.layers[i].fc1.weight, gain=deep_norm_beta)
nn.init.xavier_normal_(self.layers[i].fc2.weight, gain=deep_norm_beta)
self.layer_wise_gradient_decay_ratio = getattr(args, "layer_wise_gradient_decay_ratio", 1)
def forward(self, x, padding_mask=None, layer=None):
x, layer_results = self.extract_features(x, padding_mask, layer)
if self.layer_norm_first and layer is None:
x = self.layer_norm(x)
return x, layer_results
def extract_features(self, x, padding_mask=None, tgt_layer=None):
if padding_mask is not None:
x[padding_mask] = 0
x_conv = self.pos_conv(x.transpose(1, 2))
x_conv = x_conv.transpose(1, 2)
x = x + x_conv
if not self.layer_norm_first:
x = self.layer_norm(x)
x = F.dropout(x, p=self.dropout, training=self.training)
# B x T x C -> T x B x C
x = x.transpose(0, 1)
layer_results = []
z = None
if tgt_layer is not None:
layer_results.append((x, z))
r = None
pos_bias = None
for i, layer in enumerate(self.layers):
if self.layer_wise_gradient_decay_ratio != 1.0:
x = GradMultiply.apply(x, self.layer_wise_gradient_decay_ratio)
dropout_probability = np.random.random()
if not self.training or (dropout_probability > self.layerdrop):
x, z, pos_bias = layer(x, self_attn_padding_mask=padding_mask, need_weights=False, pos_bias=pos_bias)
if tgt_layer is not None:
layer_results.append((x, z))
if i == tgt_layer:
r = x
break
if r is not None:
x = r
# T x B x C -> B x T x C
x = x.transpose(0, 1)
return x, layer_results
class TransformerSentenceEncoderLayer(nn.Module):
def __init__(
self,
embedding_dim: float = 768,
ffn_embedding_dim: float = 3072,
num_attention_heads: float = 8,
dropout: float = 0.1,
attention_dropout: float = 0.1,
activation_dropout: float = 0.1,
activation_fn: str = "relu",
layer_norm_first: bool = False,
deep_norm: bool = False,
has_relative_attention_bias: bool = False,
num_buckets: int = 0,
max_distance: int = 0,
rescale_init: bool = False,
gru_rel_pos: bool = False,
encoder_layers: int = 0,
) -> None:
super().__init__()
self.embedding_dim = embedding_dim
self.dropout = dropout
self.activation_dropout = activation_dropout
self.activation_name = activation_fn
self.activation_fn = get_activation_fn(activation_fn)
self.self_attn = MultiheadAttention(
self.embedding_dim,
num_attention_heads,
dropout=attention_dropout,
self_attention=True,
has_relative_attention_bias=has_relative_attention_bias,
num_buckets=num_buckets,
max_distance=max_distance,
rescale_init=rescale_init,
gru_rel_pos=gru_rel_pos,
)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(self.activation_dropout)
self.dropout3 = nn.Dropout(dropout)
self.layer_norm_first = layer_norm_first
self.self_attn_layer_norm = LayerNorm(self.embedding_dim)
if self.activation_name == "glu":
self.fc1 = GLU_Linear(self.embedding_dim, ffn_embedding_dim, "swish")
else:
self.fc1 = nn.Linear(self.embedding_dim, ffn_embedding_dim)
self.fc2 = nn.Linear(ffn_embedding_dim, self.embedding_dim)
self.final_layer_norm = LayerNorm(self.embedding_dim)
self.deep_norm = deep_norm
if self.deep_norm:
self.deep_norm_alpha = math.pow(2 * encoder_layers, 1 / 4)
else:
self.deep_norm_alpha = 1
def forward(
self,
x: torch.Tensor,
self_attn_mask: torch.Tensor = None,
self_attn_padding_mask: torch.Tensor = None,
need_weights: bool = False,
pos_bias=None
):
residual = x
if self.layer_norm_first:
x = self.self_attn_layer_norm(x)
x, attn, pos_bias = self.self_attn(
query=x,
key=x,
value=x,
key_padding_mask=self_attn_padding_mask,
need_weights=False,
attn_mask=self_attn_mask,
position_bias=pos_bias
)
x = self.dropout1(x)
x = residual + x
residual = x
x = self.final_layer_norm(x)
if self.activation_name == "glu":
x = self.fc1(x)
else:
x = self.activation_fn(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
x = self.dropout3(x)
x = residual + x
else:
x, attn, pos_bias = self.self_attn(
query=x,
key=x,
value=x,
key_padding_mask=self_attn_padding_mask,
need_weights=need_weights,
attn_mask=self_attn_mask,
position_bias=pos_bias
)
x = self.dropout1(x)
x = residual * self.deep_norm_alpha + x
x = self.self_attn_layer_norm(x)
residual = x
if self.activation_name == "glu":
x = self.fc1(x)
else:
x = self.activation_fn(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
x = self.dropout3(x)
x = residual * self.deep_norm_alpha + x
x = self.final_layer_norm(x)
return x, attn, pos_bias
class MultiheadAttention(nn.Module):
"""Multi-headed attention.
See "Attention Is All You Need" for more details.
"""
def __init__(
self,
embed_dim,
num_heads,
kdim=None,
vdim=None,
dropout=0.0,
bias=True,
add_bias_kv=False,
add_zero_attn=False,
self_attention=False,
encoder_decoder_attention=False,
q_noise=0.0,
qn_block_size=8,
has_relative_attention_bias=False,
num_buckets=32,
max_distance=128,
gru_rel_pos=False,
rescale_init=False,
):
super().__init__()
self.embed_dim = embed_dim
self.kdim = kdim if kdim is not None else embed_dim
self.vdim = vdim if vdim is not None else embed_dim
self.qkv_same_dim = self.kdim == embed_dim and self.vdim == embed_dim
self.num_heads = num_heads
self.dropout_module = nn.Dropout(dropout)
self.has_relative_attention_bias = has_relative_attention_bias
self.num_buckets = num_buckets
self.max_distance = max_distance
if self.has_relative_attention_bias:
self.relative_attention_bias = nn.Embedding(num_buckets, num_heads)
self.head_dim = embed_dim // num_heads
self.q_head_dim = self.head_dim
self.k_head_dim = self.head_dim
assert (
self.head_dim * num_heads == self.embed_dim
), "embed_dim must be divisible by num_heads"
self.scaling = self.head_dim ** -0.5
self.self_attention = self_attention
self.encoder_decoder_attention = encoder_decoder_attention
assert not self.self_attention or self.qkv_same_dim, (
"Self-attention requires query, key and " "value to be of the same size"
)
k_bias = True
if rescale_init:
k_bias = False
k_embed_dim = embed_dim
q_embed_dim = embed_dim
self.k_proj = quant_noise(
nn.Linear(self.kdim, k_embed_dim, bias=k_bias), q_noise, qn_block_size
)
self.v_proj = quant_noise(
nn.Linear(self.vdim, embed_dim, bias=bias), q_noise, qn_block_size
)
self.q_proj = quant_noise(
nn.Linear(embed_dim, q_embed_dim, bias=bias), q_noise, qn_block_size
)
self.out_proj = quant_noise(
nn.Linear(embed_dim, embed_dim, bias=bias), q_noise, qn_block_size
)
if add_bias_kv:
self.bias_k = Parameter(torch.Tensor(1, 1, embed_dim))
self.bias_v = Parameter(torch.Tensor(1, 1, embed_dim))
else:
self.bias_k = self.bias_v = None
self.add_zero_attn = add_zero_attn
self.gru_rel_pos = gru_rel_pos
if self.gru_rel_pos:
self.grep_linear = nn.Linear(self.q_head_dim, 8)
self.grep_a = nn.Parameter(torch.ones(1, num_heads, 1, 1))
self.reset_parameters()
def reset_parameters(self):
if self.qkv_same_dim:
# Empirically observed the convergence to be much better with
# the scaled initialization
nn.init.xavier_uniform_(self.k_proj.weight, gain=1 / math.sqrt(2))
nn.init.xavier_uniform_(self.v_proj.weight, gain=1 / math.sqrt(2))
nn.init.xavier_uniform_(self.q_proj.weight, gain=1 / math.sqrt(2))
else:
nn.init.xavier_uniform_(self.k_proj.weight)
nn.init.xavier_uniform_(self.v_proj.weight)
nn.init.xavier_uniform_(self.q_proj.weight)
nn.init.xavier_uniform_(self.out_proj.weight)
if self.out_proj.bias is not None:
nn.init.constant_(self.out_proj.bias, 0.0)
if self.bias_k is not None:
nn.init.xavier_normal_(self.bias_k)
if self.bias_v is not None:
nn.init.xavier_normal_(self.bias_v)
if self.has_relative_attention_bias:
nn.init.xavier_normal_(self.relative_attention_bias.weight)
def _relative_positions_bucket(self, relative_positions, bidirectional=True):
num_buckets = self.num_buckets
max_distance = self.max_distance
relative_buckets = 0
if bidirectional:
num_buckets = num_buckets // 2
relative_buckets += (relative_positions > 0).to(torch.long) * num_buckets
relative_positions = torch.abs(relative_positions)
else:
relative_positions = -torch.min(relative_positions, torch.zeros_like(relative_positions))
max_exact = num_buckets // 2
is_small = relative_positions < max_exact
relative_postion_if_large = max_exact + (
torch.log(relative_positions.float() / max_exact)
/ math.log(max_distance / max_exact)
* (num_buckets - max_exact)
).to(torch.long)
relative_postion_if_large = torch.min(
relative_postion_if_large, torch.full_like(relative_postion_if_large, num_buckets - 1)
)
relative_buckets += torch.where(is_small, relative_positions, relative_postion_if_large)
return relative_buckets
def compute_bias(self, query_length, key_length):
context_position = torch.arange(query_length, dtype=torch.long)[:, None]
memory_position = torch.arange(key_length, dtype=torch.long)[None, :]
relative_position = memory_position - context_position
relative_position_bucket = self._relative_positions_bucket(
relative_position,
bidirectional=True
)
relative_position_bucket = relative_position_bucket.to(self.relative_attention_bias.weight.device)
values = self.relative_attention_bias(relative_position_bucket)
values = values.permute([2, 0, 1])
return values
def forward(
self,
query,
key: Optional[Tensor],
value: Optional[Tensor],
key_padding_mask: Optional[Tensor] = None,
incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]] = None,
need_weights: bool = True,
static_kv: bool = False,
attn_mask: Optional[Tensor] = None,
before_softmax: bool = False,
need_head_weights: bool = False,
position_bias: Optional[Tensor] = None
) -> Tuple[Tensor, Optional[Tensor], Optional[Tensor]]:
"""Input shape: Time x Batch x Channel
Args:
key_padding_mask (ByteTensor, optional): mask to exclude
keys that are pads, of shape `(batch, src_len)`, where
padding elements are indicated by 1s.
need_weights (bool, optional): return the attention weights,
averaged over heads (default: False).
attn_mask (ByteTensor, optional): typically used to
implement causal attention, where the mask prevents the
attention from looking forward in time (default: None).
before_softmax (bool, optional): return the raw attention
weights and values before the attention softmax.
need_head_weights (bool, optional): return the attention
weights for each head. Implies *need_weights*. Default:
return the average attention weights over all heads.
"""
if need_head_weights:
need_weights = True
is_tpu = query.device.type == "xla"
tgt_len, bsz, embed_dim = query.size()
src_len = tgt_len
assert embed_dim == self.embed_dim
assert list(query.size()) == [tgt_len, bsz, embed_dim]
if key is not None:
src_len, key_bsz, _ = key.size()
if not torch.jit.is_scripting():
assert key_bsz == bsz
assert value is not None
assert src_len, bsz == value.shape[:2]
if self.has_relative_attention_bias and position_bias is None:
position_bias = self.compute_bias(tgt_len, src_len)
position_bias = position_bias.unsqueeze(0).repeat(bsz, 1, 1, 1).view(bsz * self.num_heads, tgt_len, src_len)
if incremental_state is not None:
saved_state = self._get_input_buffer(incremental_state)
if saved_state is not None and "prev_key" in saved_state:
# previous time steps are cached - no need to recompute
# key and value if they are static
if static_kv:
assert self.encoder_decoder_attention and not self.self_attention
key = value = None
else:
saved_state = None
if self.self_attention:
q = self.q_proj(query)
k = self.k_proj(query)
v = self.v_proj(query)
elif self.encoder_decoder_attention:
# encoder-decoder attention
q = self.q_proj(query)
if key is None:
assert value is None
k = v = None
else:
k = self.k_proj(key)
v = self.v_proj(key)
else:
assert key is not None and value is not None
q = self.q_proj(query)
k = self.k_proj(key)
v = self.v_proj(value)
q *= self.scaling
alpha = 32
q *= 1 / alpha
if self.bias_k is not None:
assert self.bias_v is not None
k = torch.cat([k, self.bias_k.repeat(1, bsz, 1)])
v = torch.cat([v, self.bias_v.repeat(1, bsz, 1)])
if attn_mask is not None:
attn_mask = torch.cat(
[attn_mask, attn_mask.new_zeros(attn_mask.size(0), 1)], dim=1
)
if key_padding_mask is not None:
key_padding_mask = torch.cat(
[
key_padding_mask,
key_padding_mask.new_zeros(key_padding_mask.size(0), 1),
],
dim=1,
)
q = (
q.contiguous()
.view(tgt_len, bsz * self.num_heads, self.q_head_dim)
.transpose(0, 1)
)
if k is not None:
k = (
k.contiguous()
.view(-1, bsz * self.num_heads, self.k_head_dim)
.transpose(0, 1)
)
if v is not None:
v = (
v.contiguous()
.view(-1, bsz * self.num_heads, self.head_dim)
.transpose(0, 1)
)
if saved_state is not None:
# saved states are stored with shape (bsz, num_heads, seq_len, head_dim)
if "prev_key" in saved_state:
_prev_key = saved_state["prev_key"]
assert _prev_key is not None
prev_key = _prev_key.view(bsz * self.num_heads, -1, self.head_dim)
if static_kv:
k = prev_key
else:
assert k is not None
k = torch.cat([prev_key, k], dim=1)
src_len = k.size(1)
if "prev_value" in saved_state:
_prev_value = saved_state["prev_value"]
assert _prev_value is not None
prev_value = _prev_value.view(bsz * self.num_heads, -1, self.head_dim)
if static_kv:
v = prev_value
else:
assert v is not None
v = torch.cat([prev_value, v], dim=1)
prev_key_padding_mask: Optional[Tensor] = None
if "prev_key_padding_mask" in saved_state:
prev_key_padding_mask = saved_state["prev_key_padding_mask"]
assert k is not None and v is not None
key_padding_mask = MultiheadAttention._append_prev_key_padding_mask(
key_padding_mask=key_padding_mask,
prev_key_padding_mask=prev_key_padding_mask,
batch_size=bsz,
src_len=k.size(1),
static_kv=static_kv,
)
saved_state["prev_key"] = k.view(bsz, self.num_heads, -1, self.head_dim)
saved_state["prev_value"] = v.view(bsz, self.num_heads, -1, self.head_dim)
saved_state["prev_key_padding_mask"] = key_padding_mask
# In this branch incremental_state is never None
assert incremental_state is not None
incremental_state = self._set_input_buffer(incremental_state, saved_state)
assert k is not None
assert k.size(1) == src_len
# This is part of a workaround to get around fork/join parallelism
# not supporting Optional types.
if key_padding_mask is not None and key_padding_mask.dim() == 0:
key_padding_mask = None
if key_padding_mask is not None:
assert key_padding_mask.size(0) == bsz
assert key_padding_mask.size(1) == src_len
if self.add_zero_attn:
assert v is not None
src_len += 1
k = torch.cat([k, k.new_zeros((k.size(0), 1) + k.size()[2:])], dim=1)
v = torch.cat([v, v.new_zeros((v.size(0), 1) + v.size()[2:])], dim=1)
if attn_mask is not None:
attn_mask = torch.cat(
[attn_mask, attn_mask.new_zeros(attn_mask.size(0), 1)], dim=1
)
if key_padding_mask is not None:
key_padding_mask = torch.cat(
[
key_padding_mask,
torch.zeros(key_padding_mask.size(0), 1).type_as(
key_padding_mask
),
],
dim=1,
)
attn_weights = torch.bmm(q, k.transpose(1, 2))
attn_weights = (attn_weights - attn_weights.max(dim=-1, keepdim=True)[0]) * alpha
attn_weights = self.apply_sparse_mask(attn_weights, tgt_len, src_len, bsz)
assert list(attn_weights.size()) == [bsz * self.num_heads, tgt_len, src_len]
if attn_mask is not None:
attn_mask = attn_mask.unsqueeze(0)
attn_weights += attn_mask
if key_padding_mask is not None:
# don't attend to padding symbols
attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
if not is_tpu:
attn_weights = attn_weights.masked_fill(
key_padding_mask.unsqueeze(1).unsqueeze(2).to(torch.bool),
float("-inf"),
)
else:
attn_weights = attn_weights.transpose(0, 2)
attn_weights = attn_weights.masked_fill(key_padding_mask, float("-inf"))
attn_weights = attn_weights.transpose(0, 2)
attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
if before_softmax:
return attn_weights, v, position_bias
if position_bias is not None:
attn_mask_rel_pos = position_bias
if self.gru_rel_pos == 1:
query_layer = q.view(bsz, self.num_heads, tgt_len, self.q_head_dim) * alpha / self.scaling
_B, _H, _L, __ = query_layer.size()
gate_a, gate_b = torch.sigmoid(self.grep_linear(query_layer).view(
_B, _H, _L, 2, 4).sum(-1, keepdim=False)).chunk(2, dim=-1)
gate_a_1 = gate_a * (gate_b * self.grep_a - 1.0) + 2.0
attn_mask_rel_pos = gate_a_1.view(bsz * self.num_heads, tgt_len, 1) * position_bias
attn_mask_rel_pos = attn_mask_rel_pos.view(attn_weights.size())
attn_weights = attn_weights + attn_mask_rel_pos
attn_weights_float = F.softmax(
attn_weights, dim=-1
)
attn_weights = attn_weights_float.type_as(attn_weights)
attn_probs = self.dropout_module(attn_weights)
assert v is not None
attn = torch.bmm(attn_probs, v)
assert list(attn.size()) == [bsz * self.num_heads, tgt_len, self.head_dim]
attn = attn.transpose(0, 1).contiguous().view(tgt_len, bsz, embed_dim)
attn = self.out_proj(attn)
attn_weights: Optional[Tensor] = None
if need_weights:
attn_weights = attn_weights_float.view(
bsz, self.num_heads, tgt_len, src_len
).transpose(1, 0)
if not need_head_weights:
# average attention weights over heads
attn_weights = attn_weights.mean(dim=0)
return attn, attn_weights, position_bias
@staticmethod
def _append_prev_key_padding_mask(
key_padding_mask: Optional[Tensor],
prev_key_padding_mask: Optional[Tensor],
batch_size: int,
src_len: int,
static_kv: bool,
) -> Optional[Tensor]:
# saved key padding masks have shape (bsz, seq_len)
if prev_key_padding_mask is not None and static_kv:
new_key_padding_mask = prev_key_padding_mask
elif prev_key_padding_mask is not None and key_padding_mask is not None:
new_key_padding_mask = torch.cat(
[prev_key_padding_mask.float(), key_padding_mask.float()], dim=1
)
# During incremental decoding, as the padding token enters and
# leaves the frame, there will be a time when prev or current
# is None
elif prev_key_padding_mask is not None:
if src_len > prev_key_padding_mask.size(1):
filler = torch.zeros(
(batch_size, src_len - prev_key_padding_mask.size(1)),
device=prev_key_padding_mask.device,
)
new_key_padding_mask = torch.cat(
[prev_key_padding_mask.float(), filler.float()], dim=1
)
else:
new_key_padding_mask = prev_key_padding_mask.float()
elif key_padding_mask is not None:
if src_len > key_padding_mask.size(1):
filler = torch.zeros(
(batch_size, src_len - key_padding_mask.size(1)),
device=key_padding_mask.device,
)
new_key_padding_mask = torch.cat(
[filler.float(), key_padding_mask.float()], dim=1
)
else:
new_key_padding_mask = key_padding_mask.float()
else:
new_key_padding_mask = prev_key_padding_mask
return new_key_padding_mask
def _get_input_buffer(
self, incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]]
) -> Dict[str, Optional[Tensor]]:
result = self.get_incremental_state(incremental_state, "attn_state")
if result is not None:
return result
else:
empty_result: Dict[str, Optional[Tensor]] = {}
return empty_result
def _set_input_buffer(
self,
incremental_state: Dict[str, Dict[str, Optional[Tensor]]],
buffer: Dict[str, Optional[Tensor]],
):
return self.set_incremental_state(incremental_state, "attn_state", buffer)
def apply_sparse_mask(self, attn_weights, tgt_len: int, src_len: int, bsz: int):
return attn_weights
def init_bert_params(module):
"""
Initialize the weights specific to the BERT Model.
This overrides the default initializations depending on the specified arguments.
1. If normal_init_linear_weights is set then weights of linear
layer will be initialized using the normal distribution and
bais will be set to the specified value.
2. If normal_init_embed_weights is set then weights of embedding
layer will be initialized using the normal distribution.
3. If normal_init_proj_weights is set then weights of
in_project_weight for MultiHeadAttention initialized using
the normal distribution (to be validated).
"""
def normal_(data):
# with FSDP, module params will be on CUDA, so we cast them back to CPU
# so that the RNG is consistent with and without FSDP
data.copy_(
data.cpu().normal_(mean=0.0, std=0.02).to(data.device)
)
if isinstance(module, nn.Linear):
normal_(module.weight.data)
if module.bias is not None:
module.bias.data.zero_()
if isinstance(module, nn.Embedding):
normal_(module.weight.data)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
if isinstance(module, MultiheadAttention):
normal_(module.q_proj.weight.data)
normal_(module.k_proj.weight.data)
normal_(module.v_proj.weight.data)
+179
View File
@@ -0,0 +1,179 @@
# --------------------------------------------------------
# BEATs: Audio Pre-Training with Acoustic Tokenizers (https://arxiv.org/abs/2212.09058)
# Github source: https://github.com/microsoft/unilm/tree/master/beats
# Copyright (c) 2022 Microsoft
# Licensed under The MIT License [see LICENSE for details]
# Based on fairseq code bases
# https://github.com/pytorch/fairseq
# --------------------------------------------------------
import torch
import torch.nn as nn
from torch.nn import LayerNorm
import torchaudio.compliance.kaldi as ta_kaldi
from .beats_backbone import (
TransformerEncoder,
)
import logging
from typing import Optional
logger = logging.getLogger(__name__)
class BEATsConfig:
def __init__(self, cfg=None):
self.input_patch_size: int = -1 # path size of patch embedding
self.embed_dim: int = 512 # patch embedding dimension
self.conv_bias: bool = False # include bias in conv encoder
self.encoder_layers: int = 12 # num encoder layers in the transformer
self.encoder_embed_dim: int = 768 # encoder embedding dimension
self.encoder_ffn_embed_dim: int = 3072 # encoder embedding dimension for FFN
self.encoder_attention_heads: int = 12 # num encoder attention heads
self.activation_fn: str = "gelu" # activation function to use
self.layer_wise_gradient_decay_ratio: float = 1.0 # ratio for layer-wise gradient decay
self.layer_norm_first: bool = False # apply layernorm first in the transformer
self.deep_norm: bool = False # apply deep_norm first in the transformer
# dropouts
self.dropout: float = 0.1 # dropout probability for the transformer
self.attention_dropout: float = 0.1 # dropout probability for attention weights
self.activation_dropout: float = 0.0 # dropout probability after activation in FFN
self.encoder_layerdrop: float = 0.0 # probability of dropping a tarnsformer layer
self.dropout_input: float = 0.0 # dropout to apply to the input (after feat extr)
# positional embeddings
self.conv_pos: int = 128 # number of filters for convolutional positional embeddings
self.conv_pos_groups: int = 16 # number of groups for convolutional positional embedding
# relative position embedding
self.relative_position_embedding: bool = False # apply relative position embedding
self.num_buckets: int = 320 # number of buckets for relative position embedding
self.max_distance: int = 1280 # maximum distance for relative position embedding
self.gru_rel_pos: bool = False # apply gated relative position embedding
# label predictor
self.finetuned_model: bool = False # whether the model is a fine-tuned model.
self.predictor_dropout: float = 0.1 # dropout probability for the predictor
self.predictor_class: int = 527 # target class number for the predictor
if cfg is not None:
self.update(cfg)
def update(self, cfg: dict):
self.__dict__.update(cfg)
class BEATs(nn.Module):
def __init__(
self,
cfg: BEATsConfig,
) -> None:
super().__init__()
logger.info(f"BEATs Config: {cfg.__dict__}")
self.cfg = cfg
self.embed = cfg.embed_dim
self.post_extract_proj = (
nn.Linear(self.embed, cfg.encoder_embed_dim)
if self.embed != cfg.encoder_embed_dim
else None
)
self.input_patch_size = cfg.input_patch_size
self.patch_embedding = nn.Conv2d(1, self.embed, kernel_size=self.input_patch_size, stride=self.input_patch_size,
bias=cfg.conv_bias)
self.dropout_input = nn.Dropout(cfg.dropout_input)
assert not cfg.deep_norm or not cfg.layer_norm_first
self.encoder = TransformerEncoder(cfg)
self.layer_norm = LayerNorm(self.embed)
if cfg.finetuned_model:
self.predictor_dropout = nn.Dropout(cfg.predictor_dropout)
self.predictor = nn.Linear(cfg.encoder_embed_dim, cfg.predictor_class)
else:
self.predictor = None
def forward_padding_mask(
self,
features: torch.Tensor,
padding_mask: torch.Tensor,
) -> torch.Tensor:
extra = padding_mask.size(1) % features.size(1)
if extra > 0:
padding_mask = padding_mask[:, :-extra]
padding_mask = padding_mask.view(
padding_mask.size(0), features.size(1), -1
)
padding_mask = padding_mask.all(-1)
return padding_mask
def preprocess(
self,
source: torch.Tensor,
fbank_mean: float = 15.41663,
fbank_std: float = 6.55582,
) -> torch.Tensor:
fbanks = []
for waveform in source:
waveform = waveform.unsqueeze(0) * 2 ** 15
fbank = ta_kaldi.fbank(waveform, num_mel_bins=128, sample_frequency=16000, frame_length=25, frame_shift=10)
fbanks.append(fbank)
fbank = torch.stack(fbanks, dim=0)
fbank = (fbank - fbank_mean) / (2 * fbank_std)
return fbank
def extract_features(
self,
source: torch.Tensor,
padding_mask: Optional[torch.Tensor] = None,
fbank_mean: float = 15.41663,
fbank_std: float = 6.55582,
):
fbank = self.preprocess(source, fbank_mean=fbank_mean, fbank_std=fbank_std)
if padding_mask is not None:
padding_mask = self.forward_padding_mask(fbank, padding_mask)
fbank = fbank.unsqueeze(1)
features = self.patch_embedding(fbank)
features = features.reshape(features.shape[0], features.shape[1], -1)
features = features.transpose(1, 2)
features = self.layer_norm(features)
if padding_mask is not None:
padding_mask = self.forward_padding_mask(features, padding_mask)
if self.post_extract_proj is not None:
features = self.post_extract_proj(features)
x = self.dropout_input(features)
x, layer_results = self.encoder(
x,
padding_mask=padding_mask,
)
if self.predictor is not None:
x = self.predictor_dropout(x)
logits = self.predictor(x)
if padding_mask is not None and padding_mask.any():
logits[padding_mask] = 0
logits = logits.sum(dim=1)
logits = logits / (~padding_mask).sum(dim=1).unsqueeze(-1).expand_as(logits)
else:
logits = logits.mean(dim=1)
lprobs = torch.sigmoid(logits)
return lprobs, padding_mask
else:
return x, padding_mask
+219
View File
@@ -0,0 +1,219 @@
# --------------------------------------------------------
# BEATs: Audio Pre-Training with Acoustic Tokenizers (https://arxiv.org/abs/2212.09058)
# Github source: https://github.com/microsoft/unilm/tree/master/beats
# Copyright (c) 2022 Microsoft
# Licensed under The MIT License [see LICENSE for details]
# Based on fairseq code bases
# https://github.com/pytorch/fairseq
# --------------------------------------------------------
import math
import warnings
import torch
from torch import Tensor, nn
import torch.nn.functional as F
class GradMultiply(torch.autograd.Function):
@staticmethod
def forward(ctx, x, scale):
ctx.scale = scale
res = x.new(x)
return res
@staticmethod
def backward(ctx, grad):
return grad * ctx.scale, None
class SamePad(nn.Module):
def __init__(self, kernel_size, causal=False):
super().__init__()
if causal:
self.remove = kernel_size - 1
else:
self.remove = 1 if kernel_size % 2 == 0 else 0
def forward(self, x):
if self.remove > 0:
x = x[:, :, : -self.remove]
return x
class Swish(nn.Module):
def __init__(self):
super(Swish, self).__init__()
self.act = torch.nn.Sigmoid()
def forward(self, x):
return x * self.act(x)
class GLU_Linear(nn.Module):
def __init__(self, input_dim, output_dim, glu_type="sigmoid", bias_in_glu=True):
super(GLU_Linear, self).__init__()
self.glu_type = glu_type
self.output_dim = output_dim
if glu_type == "sigmoid":
self.glu_act = torch.nn.Sigmoid()
elif glu_type == "swish":
self.glu_act = Swish()
elif glu_type == "relu":
self.glu_act = torch.nn.ReLU()
elif glu_type == "gelu":
self.glu_act = torch.nn.GELU()
if bias_in_glu:
self.linear = nn.Linear(input_dim, output_dim * 2, True)
else:
self.linear = nn.Linear(input_dim, output_dim * 2, False)
def forward(self, x):
# to be consistent with GLU_Linear, we assume the input always has the #channel (#dim) in the last dimension of the tensor, so need to switch the dimension first for 1D-Conv case
x = self.linear(x)
if self.glu_type == "bilinear":
x = (x[:, :, 0:self.output_dim] * x[:, :, self.output_dim:self.output_dim * 2])
else:
x = (x[:, :, 0:self.output_dim] * self.glu_act(x[:, :, self.output_dim:self.output_dim * 2]))
return x
def gelu_accurate(x):
if not hasattr(gelu_accurate, "_a"):
gelu_accurate._a = math.sqrt(2 / math.pi)
return (
0.5 * x * (1 + torch.tanh(gelu_accurate._a * (x + 0.044715 * torch.pow(x, 3))))
)
def gelu(x: torch.Tensor) -> torch.Tensor:
return torch.nn.functional.gelu(x.float()).type_as(x)
def get_activation_fn(activation: str):
"""Returns the activation function corresponding to `activation`"""
if activation == "relu":
return F.relu
elif activation == "gelu":
return gelu
elif activation == "gelu_fast":
warnings.warn(
"--activation-fn=gelu_fast has been renamed to gelu_accurate"
)
return gelu_accurate
elif activation == "gelu_accurate":
return gelu_accurate
elif activation == "tanh":
return torch.tanh
elif activation == "linear":
return lambda x: x
elif activation == "glu":
return lambda x: x
else:
raise RuntimeError("--activation-fn {} not supported".format(activation))
def quant_noise(module, p, block_size):
"""
Wraps modules and applies quantization noise to the weights for
subsequent quantization with Iterative Product Quantization as
described in "Training with Quantization Noise for Extreme Model Compression"
Args:
- module: nn.Module
- p: amount of Quantization Noise
- block_size: size of the blocks for subsequent quantization with iPQ
Remarks:
- Module weights must have the right sizes wrt the block size
- Only Linear, Embedding and Conv2d modules are supported for the moment
- For more detail on how to quantize by blocks with convolutional weights,
see "And the Bit Goes Down: Revisiting the Quantization of Neural Networks"
- We implement the simplest form of noise here as stated in the paper
which consists in randomly dropping blocks
"""
# if no quantization noise, don't register hook
if p <= 0:
return module
# supported modules
assert isinstance(module, (nn.Linear, nn.Embedding, nn.Conv2d))
# test whether module.weight has the right sizes wrt block_size
is_conv = module.weight.ndim == 4
# 2D matrix
if not is_conv:
assert (
module.weight.size(1) % block_size == 0
), "Input features must be a multiple of block sizes"
# 4D matrix
else:
# 1x1 convolutions
if module.kernel_size == (1, 1):
assert (
module.in_channels % block_size == 0
), "Input channels must be a multiple of block sizes"
# regular convolutions
else:
k = module.kernel_size[0] * module.kernel_size[1]
assert k % block_size == 0, "Kernel size must be a multiple of block size"
def _forward_pre_hook(mod, input):
# no noise for evaluation
if mod.training:
if not is_conv:
# gather weight and sizes
weight = mod.weight
in_features = weight.size(1)
out_features = weight.size(0)
# split weight matrix into blocks and randomly drop selected blocks
mask = torch.zeros(
in_features // block_size * out_features, device=weight.device
)
mask.bernoulli_(p)
mask = mask.repeat_interleave(block_size, -1).view(-1, in_features)
else:
# gather weight and sizes
weight = mod.weight
in_channels = mod.in_channels
out_channels = mod.out_channels
# split weight matrix into blocks and randomly drop selected blocks
if mod.kernel_size == (1, 1):
mask = torch.zeros(
int(in_channels // block_size * out_channels),
device=weight.device,
)
mask.bernoulli_(p)
mask = mask.repeat_interleave(block_size, -1).view(-1, in_channels)
else:
mask = torch.zeros(
weight.size(0), weight.size(1), device=weight.device
)
mask.bernoulli_(p)
mask = (
mask.unsqueeze(2)
.unsqueeze(3)
.repeat(1, 1, mod.kernel_size[0], mod.kernel_size[1])
)
# scale weights and apply mask
mask = mask.to(
torch.bool
) # x.bool() is not currently supported in TorchScript
s = 1 / (1 - p)
mod.weight.data = s * weight.masked_fill(mask, 0)
module.register_forward_pre_hook(_forward_pre_hook)
return module
+106 -5
View File
@@ -1,3 +1,4 @@
import os
import sqlite3
import threading
from datetime import datetime, timezone
@@ -7,7 +8,7 @@ from .paths import _log
class ProcessedDB:
_SCHEMA_VERSION = 3 # bump when schema changes
_SCHEMA_VERSION = 4 # bump when schema changes
def __init__(self, db_path: str | None = None):
if db_path is None:
@@ -47,6 +48,7 @@ class ProcessedDB:
" clip_count INTEGER NOT NULL DEFAULT 3,"
" spread REAL NOT NULL DEFAULT 3.0,"
" profile TEXT NOT NULL DEFAULT 'default',"
" source_path TEXT NOT NULL DEFAULT '',"
" processed_at TEXT NOT NULL"
")"
)
@@ -62,6 +64,7 @@ class ProcessedDB:
"clip_count": "INTEGER NOT NULL DEFAULT 3",
"spread": "REAL NOT NULL DEFAULT 3.0",
"profile": "TEXT NOT NULL DEFAULT 'default'",
"source_path": "TEXT NOT NULL DEFAULT ''",
}
for col, typedef in new_cols.items():
if col not in cols:
@@ -85,7 +88,7 @@ class ProcessedDB:
short_side: int | None = None, portrait_ratio: str = "",
crop_center: float = 0.5, fmt: str = "MP4",
clip_count: int = 3, spread: float = 3.0,
profile: str = "default") -> None:
profile: str = "default", source_path: str = "") -> None:
if not self._enabled:
return
with self._lock:
@@ -93,11 +96,11 @@ class ProcessedDB:
"INSERT INTO processed"
" (filename, start_time, output_path, label, category,"
" short_side, portrait_ratio, crop_center, format,"
" clip_count, spread, profile, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
" clip_count, spread, profile, source_path, processed_at)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(filename, start_time, output_path, label, category,
short_side, portrait_ratio, crop_center, fmt,
clip_count, spread, profile,
clip_count, spread, profile, source_path,
datetime.now(timezone.utc).isoformat()),
)
self._con.commit()
@@ -223,6 +226,104 @@ class ProcessedDB:
).fetchall()
return [r[0] for r in rows]
def get_export_folders(self, profile: str = "default") -> list[str]:
"""Return distinct export folder names found in output_paths for a profile.
Export paths follow the structure:
.../export_folder/group_dir/clip.mp4
The export folder is 2 levels up from the clip file.
Returns folder names sorted alphabetically (e.g. ["mp4_Intense", "mp4_Soft"]).
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT DISTINCT output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
folder_names: set[str] = set()
for (op,) in rows:
grandparent = os.path.basename(os.path.dirname(os.path.dirname(op)))
if grandparent:
folder_names.add(grandparent)
return sorted(folder_names)
def get_training_data(self, profile: str, positive_folder: str,
fallback_video_dir: str = "",
) -> list[tuple[str, list[float], list[float]]]:
"""Build training video_infos from DB data.
Args:
profile: profile name
positive_folder: export folder name for positive class (e.g. "mp4_Intense")
fallback_video_dir: if source_path is empty, try filename in this dir
Returns:
list of (source_video_path, positive_times, soft_times) per video.
Soft times = clips from any other export folder.
"""
if not self._enabled:
return []
rows = self._con.execute(
"SELECT filename, start_time, output_path, source_path"
" FROM processed WHERE profile = ?",
(profile,),
).fetchall()
# Collect times by video, split by positive vs other folders
pos_by_video: dict[str, set[float]] = {}
soft_by_video: dict[str, set[float]] = {}
source_by_filename: dict[str, str] = {}
for fn, st, op, sp in rows:
if sp:
source_by_filename[fn] = sp
grandparent = os.path.basename(os.path.dirname(os.path.dirname(op)))
if grandparent == positive_folder:
pos_by_video.setdefault(fn, set()).add(st)
else:
soft_by_video.setdefault(fn, set()).add(st)
result = []
for fn in pos_by_video:
sp = source_by_filename.get(fn, "")
if not sp or not os.path.exists(sp):
# Fallback: try video_dir / filename
if fallback_video_dir:
sp = os.path.join(fallback_video_dir, fn)
if not sp or not os.path.exists(sp):
continue
gt_pos = sorted(pos_by_video[fn])
gt_soft = sorted(soft_by_video.get(fn, set()))
result.append((sp, gt_pos, gt_soft))
return result
def get_training_stats(self, profile: str) -> dict[str, dict]:
"""Return per-subprofile stats for training readiness display.
Returns dict mapping subprofile_name → {
'videos': number of distinct source videos,
'clips': total clip count,
}
"""
if not self._enabled:
return {}
rows = self._con.execute(
"SELECT filename, output_path FROM processed WHERE profile = ?",
(profile,),
).fetchall()
folders = self.get_export_folders(profile)
stats: dict[str, dict] = {}
for folder_name in folders:
videos: set[str] = set()
clips = 0
for fn, op in rows:
grandparent = os.path.basename(os.path.dirname(os.path.dirname(op)))
if grandparent == folder_name:
videos.add(fn)
clips += 1
stats[folder_name] = {"videos": len(videos), "clips": clips}
return stats
def hide_file(self, filename: str, profile: str = "default") -> None:
if not self._enabled:
return