diff --git a/nodes/selva_dataset_pipeline.py b/nodes/selva_dataset_pipeline.py index 2359e89..f360e5d 100644 --- a/nodes/selva_dataset_pipeline.py +++ b/nodes/selva_dataset_pipeline.py @@ -110,3 +110,80 @@ class SelvaDatasetResampler: print(f"[DatasetResampler] {changed}/{len(dataset)} clips resampled → {target_sr} Hz", flush=True) return (out,) + + +class SelvaDatasetLUFSNormalizer: + """Normalize each clip to a target integrated LUFS level + true peak limit.""" + + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "dataset": (AUDIO_DATASET,), + "target_lufs": ("FLOAT", { + "default": -23.0, "min": -40.0, "max": -6.0, "step": 0.5, + "tooltip": "Target integrated loudness in LUFS. -23 is EBU R128 standard.", + }), + "true_peak_dbtp": ("FLOAT", { + "default": -1.0, "min": -6.0, "max": 0.0, "step": 0.5, + "tooltip": "True peak ceiling in dBTP. Applied after LUFS gain.", + }), + } + } + + RETURN_TYPES = (AUDIO_DATASET,) + RETURN_NAMES = ("dataset",) + FUNCTION = "normalize" + CATEGORY = SELVA_CATEGORY + DESCRIPTION = ( + "Normalize each clip to target_lufs (BS.1770-4) then apply a true peak ceiling. " + "Skips clips that are too short for LUFS measurement (< 0.4 s)." + ) + + def normalize(self, dataset, target_lufs: float, true_peak_dbtp: float): + import pyloudnorm as pyln + + tp_linear = 10.0 ** (true_peak_dbtp / 20.0) + out = [] + skipped = 0 + + for item in dataset: + wav = item["waveform"][0] # [C, L] + sr = item["sample_rate"] + + # pyloudnorm wants [L] mono or [L, C] multichannel, float64 + wav_np = wav.permute(1, 0).double().numpy() # [L, C] + if wav_np.shape[1] == 1: + wav_np = wav_np[:, 0] # [L] mono + + meter = pyln.Meter(sr) + try: + loudness = meter.integrated_loudness(wav_np) + except Exception: + skipped += 1 + out.append(item) + continue + + if not np.isfinite(loudness): + skipped += 1 + out.append(item) + continue + + gain_db = target_lufs - loudness + gain_linear = 10.0 ** (gain_db / 20.0) + + wav_norm = wav * gain_linear + + # True peak limit + peak = wav_norm.abs().max().item() + if peak > tp_linear: + wav_norm = wav_norm * (tp_linear / peak) + + out.append({"waveform": wav_norm.unsqueeze(0), "sample_rate": sr, "name": item["name"]}) + + print( + f"[LUFSNormalizer] {len(dataset) - skipped}/{len(dataset)} clips normalized " + f"target={target_lufs} LUFS TP={true_peak_dbtp} dBTP skipped={skipped}", + flush=True, + ) + return (out,)