Replace torchaudio.save with soundfile.write; add EPUB loader node

- nodes/generator.py: swap torchaudio.save for soundfile.write to avoid
  torchcodec/FFmpeg dependency crash in environments without FFmpeg shared libs
- nodes/epub_loader.py: new OmniVoiceEpubLoader node for loading EPUB chapters
- tests/test_epub_loader.py: 8 tests for the EPUB loader
- install.py: add beautifulsoup4 to runtime deps
- __init__.py, nodes/__init__.py: register OmniVoiceEpubLoader

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-05 17:24:18 +02:00
parent 5366f6992e
commit 5dfaa0b300
8 changed files with 254 additions and 6 deletions
+2 -1
View File
@@ -1,4 +1,5 @@
from .loader import OmniVoiceModelLoader
from .generator import OmniVoiceGenerate
from .epub_loader import OmniVoiceEpubLoader
__all__ = ["OmniVoiceModelLoader", "OmniVoiceGenerate"]
__all__ = ["OmniVoiceModelLoader", "OmniVoiceGenerate", "OmniVoiceEpubLoader"]
+109
View File
@@ -0,0 +1,109 @@
import zipfile
import io
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
def _local(tag):
"""Strip XML namespace prefix, return local tag name."""
return tag.split('}')[-1]
def _extract_chapters(epub_path):
"""Parse EPUB and return list of {"title": str|None, "text": str}."""
chapters = []
with zipfile.ZipFile(epub_path, 'r') as zf:
# 1. Find OPF path from container.xml
container = ET.fromstring(zf.read('META-INF/container.xml'))
rootfile = next(
el for el in container.iter()
if _local(el.tag) == 'rootfile'
)
opf_path = rootfile.attrib['full-path']
opf_dir = opf_path.rsplit('/', 1)[0] + '/' if '/' in opf_path else ''
# 2. Parse OPF: build manifest and spine
opf = ET.fromstring(zf.read(opf_path))
manifest = {
el.attrib['id']: el.attrib['href']
for el in opf.iter()
if _local(el.tag) == 'item'
and 'xhtml' in el.attrib.get('media-type', '')
}
spine = [
el.attrib['idref']
for el in opf.iter()
if _local(el.tag) == 'itemref'
]
# 3. Extract text from each chapter XHTML
for idref in spine:
href = manifest.get(idref)
if href is None:
continue
xhtml = zf.read(opf_dir + href).decode('utf-8', errors='replace')
soup = BeautifulSoup(xhtml, 'html.parser')
for tag in soup(['script', 'style']):
tag.decompose()
# Title: <title> → <h1/h2/h3> → None
title = None
if soup.title and soup.title.string:
title = soup.title.string.strip()
if not title:
for hn in ['h1', 'h2', 'h3']:
tag = soup.find(hn)
if tag:
title = tag.get_text(strip=True)
break
text = soup.get_text(separator=' ', strip=True)
chapters.append({"title": title, "text": text})
return chapters
class OmniVoiceEpubLoader:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"epub_path": ("STRING", {
"default": "",
"tooltip": "Absolute path to the .epub file to load.",
}),
"chapter_start": ("INT", {
"default": 1, "min": 1, "max": 9999, "step": 1,
"tooltip": "First chapter to include (1-indexed). Clamped to valid range automatically.",
}),
"chapter_end": ("INT", {
"default": 1, "min": 1, "max": 9999, "step": 1,
"tooltip": "Last chapter to include (1-indexed, inclusive). Clamped automatically. If less than chapter_start, set to chapter_start.",
}),
},
}
RETURN_TYPES = ("STRING", "STRING")
RETURN_NAMES = ("text", "chapter_list")
FUNCTION = "load_epub"
CATEGORY = "OmniVoice"
def load_epub(self, epub_path, chapter_start, chapter_end):
chapters = _extract_chapters(epub_path)
n = len(chapters)
if n == 0:
return ("", "")
start = max(1, min(chapter_start, n))
end = max(start, min(chapter_end, n))
# chapter_list: all chapters regardless of selection
chapter_list = "\n".join(
f"{i}. {ch['title'] if ch['title'] else f'Chapter {i}'}"
for i, ch in enumerate(chapters, 1)
)
# text: selected range joined by delimiter
selected = chapters[start - 1 : end]
text = "\n\n---\n\n".join(ch["text"] for ch in selected)
return (text, chapter_list)
+4 -2
View File
@@ -1,7 +1,7 @@
import tempfile
import os
import torch
import torchaudio
import soundfile as sf
class OmniVoiceGenerate:
@@ -109,7 +109,9 @@ class OmniVoiceGenerate:
tmp.close()
try:
ref_waveform = ref_audio["waveform"].squeeze(0).cpu() # (channels, samples)
torchaudio.save(tmp_path, ref_waveform, int(ref_audio["sample_rate"]))
audio_np = ref_waveform.numpy()
# soundfile expects (samples,) for mono or (samples, channels) for multi-channel
sf.write(tmp_path, audio_np[0] if audio_np.shape[0] == 1 else audio_np.T, int(ref_audio["sample_rate"]))
kwargs["ref_audio"] = tmp_path
if ref_text:
kwargs["ref_text"] = ref_text