from __future__ import annotations import re from typing import Any, Iterable EMPTY_FIELD_LABELS = ( "Ages", "Body types", "Cast", "Cast descriptors", "Characters", "Scene", "Setting", "Pose", "Sexual pose", "Sexual scene", "Facial expression", "Facial expressions", "Clothing", "Erotic outfit", "Prop/detail", "Composition", "Role graph", "Camera", "Camera control", "Camera priority", "Use", "Avoid", ) def clean_spacing(value: Any) -> str: text = "" if value is None else str(value) text = text.replace("\n", " ") text = re.sub(r"\s+", " ", text).strip() text = re.sub(r"\s+([,.;:])", r"\1", text) text = re.sub(r"([,;:]){2,}", r"\1", text) text = re.sub(r"\.\s*\.", ".", text) text = re.sub(r",\s*\.", ".", text) text = re.sub(r":\s*\.", ".", text) text = re.sub(r";\s*\.", ".", text) text = re.sub(r"\(\s+", "(", text) text = re.sub(r"\s+\)", ")", text) return text.strip() def _strip_empty_fields(text: str) -> str: if not text: return "" labels = "|".join(re.escape(label) for label in EMPTY_FIELD_LABELS) text = re.sub(rf"\b(?:{labels})\s*:\s*[.,;]", "", text, flags=re.IGNORECASE) text = re.sub(rf"\b(?:{labels}):\s*(?=\.|,|;|$)", "", text, flags=re.IGNORECASE) text = re.sub(rf"(^|(?<=[.!?])\s+)(?:{labels})\.(?=\s|$)", r"\1", text, flags=re.IGNORECASE) text = re.sub(rf"\b(?:{labels}):\s*(?:none|null|n/a)\b[.,;]?", "", text, flags=re.IGNORECASE) return clean_spacing(text) def _drop_dangling_connectors(text: str) -> str: text = re.sub(r"\b(?:with|and|or|while|featuring)\s*([,.;])", r"\1", text, flags=re.IGNORECASE) text = re.sub(r"([,.;])\s*(?:with|and|or|while|featuring)\s*([,.;])", r"\1", text, flags=re.IGNORECASE) text = re.sub(r"\bwith\s*,", "", text, flags=re.IGNORECASE) text = re.sub(r",\s*and\s*\.", ".", text, flags=re.IGNORECASE) return clean_spacing(text) def _sentence_key(text: str, triggers: Iterable[str] = ()) -> str: key_text = text for trigger in triggers: trigger = str(trigger or "").strip() if trigger: key_text = re.sub(rf"^{re.escape(trigger)}\s*[,.;]\s*", "", key_text, flags=re.IGNORECASE) return re.sub(r"\W+", " ", key_text.lower()).strip() def _dedupe_adjacent_sentences(text: str, triggers: Iterable[str] = ()) -> str: parts = [part.strip() for part in re.split(r"(?<=[.!?])\s+", text) if part.strip()] deduped: list[str] = [] previous = "" for part in parts: key = _sentence_key(part, triggers) if key and key != previous: deduped.append(part) previous = key return " ".join(deduped) def _dedupe_labeled_sentences(text: str) -> str: parts = [part.strip() for part in re.split(r"(?<=[.!?])\s+", text) if part.strip()] seen: set[tuple[str, str]] = set() deduped: list[str] = [] for part in parts: match = re.match(r"^([A-Za-z][A-Za-z /_-]{1,40}):\s*(.+)$", part) if not match: deduped.append(part) continue key = (match.group(1).strip().lower(), re.sub(r"\W+", " ", match.group(2).lower()).strip()) if key not in seen: deduped.append(part) seen.add(key) return " ".join(deduped) def _trigger_prefix_key(text: str, triggers: Iterable[str]) -> str: lowered = text.lower().strip() for trigger in triggers: trigger = str(trigger or "").strip() if trigger and lowered.startswith(trigger.lower()): return trigger return "" def _dedupe_trigger_prefix(text: str, triggers: Iterable[str]) -> str: text = clean_spacing(text) trigger = _trigger_prefix_key(text, triggers) if not trigger: return text pattern = rf"^(?:{re.escape(trigger)}\s*[,.;]\s*)+" return f"{trigger}, {re.sub(pattern, '', text, flags=re.IGNORECASE).strip(' ,.;')}" def _split_comma_items(text: str) -> list[str]: return [part.strip(" ,.;") for part in re.split(r"\s*[,;]\s*", clean_spacing(text)) if part.strip(" ,.;")] def dedupe_comma_list(text: Any) -> str: items: list[str] = [] seen: set[str] = set() for item in _split_comma_items(str(text or "")): key = re.sub(r"\W+", " ", item.lower()).strip() if key and key not in seen: items.append(item) seen.add(key) return ", ".join(items) def sanitize_prose_text(value: Any, triggers: Iterable[str] = ()) -> str: text = clean_spacing(value) if not text: return "" text = _strip_empty_fields(text) text = _drop_dangling_connectors(text) text = _dedupe_labeled_sentences(text) text = _dedupe_trigger_prefix(text, triggers) text = _dedupe_adjacent_sentences(text, triggers) return clean_spacing(text).strip(" ,;") def sanitize_prompt_text(value: Any, triggers: Iterable[str] = ()) -> str: return sanitize_prose_text(value, triggers=triggers) def sanitize_caption_text(value: Any, triggers: Iterable[str] = ()) -> str: return sanitize_prose_text(value, triggers=triggers) def sanitize_tag_prompt(value: Any, triggers: Iterable[str] = ()) -> str: text = clean_spacing(value) if not text: return "" trigger = _trigger_prefix_key(text, triggers) if trigger: text = re.sub(rf"^(?:{re.escape(trigger)}\s*[,;]\s*)+", "", text, flags=re.IGNORECASE).strip(" ,;") return f"{trigger}, {dedupe_comma_list(text)}" if text else trigger return dedupe_comma_list(text) def sanitize_negative_text(value: Any) -> str: return dedupe_comma_list(value) def combine_negative_text(*parts: Any) -> str: cleaned = [clean_spacing(part).strip(" ,.;") for part in parts if clean_spacing(part).strip(" ,.;")] return sanitize_negative_text(", ".join(cleaned))