Files
ComfyUI-Ethanfel-Prompt-Bui…/krea_formatter.py
T

651 lines
25 KiB
Python

from __future__ import annotations
import re
from typing import Any
try:
from . import formatter_input as input_policy
from . import krea_format_route
from . import route_metadata as route_metadata_policy
from .krea_action_context import (
is_close_foreplay_text as _is_close_foreplay_text,
is_outercourse_text as _is_outercourse_text,
normalize_hardcore_detail_density as _normalize_hardcore_detail_density,
)
from . import krea_configured_cast_formatter
from . import krea_normal_formatter
from . import krea_pair_formatter
from . import krea_row_fields
from .hardcore_text_cleanup import (
sanitize_hardcore_axis_values as _sanitize_hardcore_axis_values,
sanitize_hardcore_environment_anchors as _sanitize_hardcore_environment_anchors,
)
from .krea_cast import (
cast_prose as _cast_prose,
label_join as _label_join,
lowercase_for_inline_join as _lowercase_for_inline_join,
natural_label_text as _natural_label_text,
prompt_cast_descriptors as _prompt_cast_descriptors,
)
from .krea_clothing import natural_clothing_state as _natural_clothing_state
from .krea_action_positions import action_position_phrase as _action_position_phrase
from .krea_actions import hardcore_action_sentence as _hardcore_action_sentence
from .krea_pov import (
filter_pov_labeled_clauses as _filter_pov_labeled_clauses,
merge_labels as _merge_labels,
pov_camera_phrase as _pov_camera_phrase,
pov_composition_text as _pov_composition_text,
pov_labels_from_value as _pov_labels_from_value,
)
from .krea_pov_actions import pov_action_phrase as _pov_action_phrase
from .prompt_hygiene import sanitize_negative_text, sanitize_prose_text
except ImportError: # Allows local smoke tests with `python -c`.
import formatter_input as input_policy
import krea_format_route
import route_metadata as route_metadata_policy
from krea_action_context import (
is_close_foreplay_text as _is_close_foreplay_text,
is_outercourse_text as _is_outercourse_text,
normalize_hardcore_detail_density as _normalize_hardcore_detail_density,
)
import krea_configured_cast_formatter
import krea_normal_formatter
import krea_pair_formatter
import krea_row_fields
from hardcore_text_cleanup import (
sanitize_hardcore_axis_values as _sanitize_hardcore_axis_values,
sanitize_hardcore_environment_anchors as _sanitize_hardcore_environment_anchors,
)
from krea_cast import (
cast_prose as _cast_prose,
label_join as _label_join,
lowercase_for_inline_join as _lowercase_for_inline_join,
natural_label_text as _natural_label_text,
prompt_cast_descriptors as _prompt_cast_descriptors,
)
from krea_clothing import natural_clothing_state as _natural_clothing_state
from krea_action_positions import action_position_phrase as _action_position_phrase
from krea_actions import hardcore_action_sentence as _hardcore_action_sentence
from krea_pov import (
filter_pov_labeled_clauses as _filter_pov_labeled_clauses,
merge_labels as _merge_labels,
pov_camera_phrase as _pov_camera_phrase,
pov_composition_text as _pov_composition_text,
pov_labels_from_value as _pov_labels_from_value,
)
from krea_pov_actions import pov_action_phrase as _pov_action_phrase
from prompt_hygiene import sanitize_negative_text, sanitize_prose_text
TRIGGER_CANDIDATES = (
"sxcpinup_coloredpencil",
"sxcppnl7",
)
PROMPT_FIELD_LABELS = input_policy.prompt_field_labels()
def _clean(value: Any) -> str:
return input_policy.clean_text(value)
def _is_false(value: Any) -> bool:
if isinstance(value, bool):
return value is False
if isinstance(value, str):
return value.strip().lower() in ("false", "0", "no", "off")
return False
def _expression_disabled(row: dict[str, Any]) -> bool:
return bool(row.get("expression_disabled")) or _is_false(row.get("expression_enabled", True))
def _sentence(text: str) -> str:
text = _clean(text).strip(" ,;")
if not text:
return ""
text = text[:1].upper() + text[1:]
if text[-1] not in ".!?":
text += "."
return text
def _paragraph(parts: list[str]) -> str:
return " ".join(part for part in (_sentence(part) for part in parts) if part)
def _formatter_hint_parts(*rows: dict[str, Any]) -> list[str]:
hints: list[str] = []
for row in rows:
if not isinstance(row, dict):
continue
for hint in route_metadata_policy.row_formatter_hints(row, "krea"):
hint = _clean(hint).strip(" .")
if hint and hint not in hints:
hints.append(hint)
return hints
def _append_formatter_hints(prompt: str, *rows: dict[str, Any]) -> str:
hints = _formatter_hint_parts(*rows)
if not hints:
return prompt
return _paragraph([prompt, *hints])
def _with_indefinite_article(text: str) -> str:
text = _clean(text)
if not text or text.lower().startswith(("a ", "an ")):
return text
article = "an" if text[:1].lower() in "aeiou" else "a"
return f"{article} {text}"
def _maybe_json(text: str) -> dict[str, Any] | None:
return input_policy.maybe_json(text)
def _row_from_inputs(source_text: str, metadata_json: str, input_hint: str) -> tuple[dict[str, Any] | None, str]:
return input_policy.row_from_inputs(source_text, metadata_json, input_hint)
def _strip_trigger(text: str, preserve_trigger: bool) -> str:
return input_policy.strip_trigger_prefix(text, TRIGGER_CANDIDATES, preserve_trigger=preserve_trigger)
def _split_avoid(text: str) -> tuple[str, str]:
return input_policy.split_avoid(text)
def _prompt_field(text: str, label: str) -> str:
return input_policy.prompt_field(text, label, field_labels=PROMPT_FIELD_LABELS)
def _row_value(row: dict[str, Any], key: str, labels: tuple[str, ...] = ()) -> str:
return input_policy.row_value(row, key, labels, field_labels=PROMPT_FIELD_LABELS)
def _body_phrase(body: Any, figure_note: Any = "") -> str:
body = _clean(body)
figure_note = _clean(figure_note)
if not body:
return figure_note
if not figure_note:
return f"{body} figure"
if "figure" in figure_note.lower():
return f"{body} build and {figure_note}"
return f"{body} figure with {figure_note}"
def _single_caption_front(row: dict[str, Any]) -> dict[str, str]:
caption = _strip_trigger(_clean(row.get("caption")), False)
if not caption:
return {}
subject = _clean(row.get("primary_subject"))
age = _clean(row.get("age_band") or row.get("age"))
body = _clean(row.get("body_phrase"))
if not body:
body_type = _clean(row.get("body_type") or row.get("body"))
figure = _clean(row.get("figure"))
body = _body_phrase(body_type, figure)
front = f"{subject}, {age}, {body}, "
if subject in ("woman", "man") and age and body and caption.startswith(front):
try:
skin, hair, eyes, _rest = caption[len(front) :].split(", ", 3)
except ValueError:
return {}
return {"body_phrase": body, "skin": skin, "hair": hair, "eyes": eyes}
return {}
def _combine_negative(*parts: str) -> str:
cleaned = [_clean(part).strip(" ,.") for part in parts if _clean(part).strip(" ,.")]
return ", ".join(cleaned)
def _sanitize_scene_text_for_cast(text: Any, labels: list[str]) -> str:
text = _clean(text)
if not text:
return ""
if len(labels) < 3:
text = re.sub(r"\s*(?:while|as)\s+another partner watches\b", "", text, flags=re.IGNORECASE)
text = re.sub(r"\banother partner watches\b", "", text, flags=re.IGNORECASE)
text = re.sub(r",?\s*\bone partner held between two bodies\b", "", text, flags=re.IGNORECASE)
text = re.sub(r",?\s*\bthree bodies locked together\b", "", text, flags=re.IGNORECASE)
text = re.sub(r",?\s*\bthree bodies\b", "", text, flags=re.IGNORECASE)
text = re.sub(r"\bwith\s*,\s*", "with ", text, flags=re.IGNORECASE)
text = re.sub(r"\bwhile blowjob\b", "during a blowjob", text, flags=re.IGNORECASE)
text = re.sub(r"\bfeaturing blowjob\b", "featuring a blowjob", text, flags=re.IGNORECASE)
text = re.sub(r"\s+,", ",", text)
text = re.sub(r",\s*,", ",", text)
text = re.sub(r"\s{2,}", " ", text).strip(" ,")
return text
def _composition_phrase(
composition: Any,
action: str = "",
prefix: str = "framed as",
detail_density: str = "balanced",
) -> str:
composition = _clean(composition)
if not composition:
return ""
action_lower = _clean(action).lower()
composition_lower = composition.lower()
detail_density = _normalize_hardcore_detail_density(detail_density)
if "first-person underview" in action_lower or "straddling the viewer's face" in action_lower:
if any(token in composition_lower for token in ("mirror-reflected", "oral scene", "face and body visible")):
return (
f"{prefix} close first-person underview with the woman's thighs framing the camera and the oral contact centered"
)
if "pov viewer" in action_lower and any(
token in action_lower
for token in ("ass raised", "seen from behind", "on all fours", "bent forward", "face-down")
):
return (
f"{prefix} first-person rear-view frame looking down at the woman's raised ass, with foreground hands and rear-entry contact readable"
)
oral_pose_tokens = (
"kneeling oral",
"side-lying oral",
"spread-leg oral",
"standing oral",
"edge-of-bed oral",
"face-sitting",
"sixty-nine",
"reclining cunnilingus",
"straddled oral",
"chair oral",
)
if "oral" in action_lower:
composition_oral_tokens = [token for token in oral_pose_tokens if token in composition_lower]
if composition_oral_tokens and not any(token in action_lower for token in composition_oral_tokens):
match = re.search(r"\bwith\s+(.+)$", composition, flags=re.IGNORECASE)
return f"framed with {match.group(1)}" if match else ""
if _is_outercourse_text(action_lower):
return f"{prefix} {composition}"
position = _action_position_phrase(action)
close_or_aftermath = any(
token in composition_lower
for token in ("close-up", "close crop", "tight", "direct-flash", "subscriber-view", "post-ejaculation", "aftermath")
)
if _is_close_foreplay_text(action_lower) and close_or_aftermath:
return f"{prefix} {composition}, in one continuous first-person close-body frame"
if position and close_or_aftermath:
if detail_density == "compact":
return f"{prefix} {composition}, with the {position} still readable"
return f"{prefix} {composition}, keeping the {position} and action geography readable"
return f"{prefix} {composition}"
def _clean_age(age: Any) -> str:
return _clean(age)
def _age_detail_phrase(age: Any) -> str:
text = _clean(age)
text = re.sub(r"\s+adults?$", "", text).strip()
return text.replace("-year-old", " years old")
def _age_subject(row: dict[str, Any], fallback_subject: str = "adult person") -> str:
subject = _clean(row.get("subject_phrase") or row.get("primary_subject") or row.get("subject") or fallback_subject)
age = _clean_age(row.get("age_band") or row.get("age"))
if row.get("subject_type") == "configured_cast":
return _clean(row.get("subject_phrase") or subject)
if subject in ("woman", "man"):
if age:
return f"{age} {subject}" if "adult" in age.lower() else f"{age} adult {subject}"
return f"adult {subject}"
if age and "adult" not in subject.lower():
return f"{age} {subject}"
return subject or fallback_subject
def _appearance_phrase(row: dict[str, Any]) -> str:
front = _single_caption_front(row)
parts = [
_row_value(row, "body_phrase") or front.get("body_phrase"),
_row_value(row, "skin") or front.get("skin"),
_row_value(row, "hair") or front.get("hair"),
_row_value(row, "eyes") or front.get("eyes"),
]
return ", ".join(_clean(part) for part in parts if _clean(part))
def _expression_phrase(expression: Any) -> str:
expression = _clean(expression)
if not expression:
return ""
if ";" in expression or re.search(
r"\b(?:Woman|Man) [A-Z] has\b|\bthe (?:woman|man) has\b",
expression,
flags=re.IGNORECASE,
):
return f"Expressions: {expression}"
return f"with {expression}"
def _camera_phrase(row: dict[str, Any]) -> str:
directive = _clean(row.get("camera_directive"))
if directive:
return directive
config = row.get("camera_config")
if isinstance(config, dict):
detail = _clean(config.get("camera_detail"))
if detail == "off" or _clean(config.get("camera_mode")) == "disabled":
return ""
custom = _clean(config.get("custom_camera_prompt"))
if custom:
base = _clean(config.get("camera_mode")).replace("_", " ")
pieces = [piece for piece in (base, custom) if piece and piece != "standard"]
return "Camera: " + ", ".join(pieces)
mode = _clean(config.get("camera_mode")).replace("_", " ")
shot = _clean(config.get("shot_size")).replace("_", " ")
angle = _clean(config.get("angle")).replace("_", " ")
pieces = [piece for piece in (mode, shot, angle) if piece and piece != "auto" and piece != "standard"]
if pieces:
return "Camera: " + ", ".join(pieces)
return ""
def _camera_scene_phrase(row: dict[str, Any]) -> str:
return _clean(row.get("camera_scene_directive"))
def _camera_phrase_from_config(config: Any) -> str:
if not isinstance(config, dict):
return ""
detail = _clean(config.get("camera_detail"))
if detail == "off" or _clean(config.get("camera_mode")) == "disabled":
return ""
custom = _clean(config.get("custom_camera_prompt"))
if custom:
base = _clean(config.get("camera_mode")).replace("_", " ")
pieces = [piece for piece in (base, custom) if piece and piece != "standard"]
return "Camera: " + ", ".join(pieces)
values = [
_clean(config.get("camera_mode")).replace("_", " "),
_clean(config.get("shot_size")).replace("_", " "),
_clean(config.get("angle")).replace("_", " "),
_clean(config.get("lens")).replace("_", " "),
_clean(config.get("distance")).replace("_", " "),
_clean(config.get("orientation")).replace("_", " "),
_clean(config.get("phone_visibility")).replace("_", " "),
]
pieces = [value for value in values if value and value not in ("auto", "standard")]
if not pieces:
return ""
return "Camera: " + ", ".join(pieces)
def _pair_camera_phrase(directive: Any, config: Any, row: dict[str, Any]) -> str:
directive_text = _clean(directive)
if directive_text:
return directive_text
if isinstance(config, dict) and (
_clean(config.get("camera_detail")) == "off" or _clean(config.get("camera_mode")) == "disabled"
):
return ""
return _camera_phrase_from_config(config) or _camera_phrase(row)
def _style_phrase(row: dict[str, Any], style_mode: str) -> str:
if style_mode == "minimal":
return ""
if style_mode == "photographic":
return "realistic creator-shot photography with natural lighting, tactile skin and fabric detail, and clean social-media composition"
style = _clean(row.get("style"))
suffix = _clean(row.get("positive_suffix")) or _prompt_field(_clean(row.get("prompt")), "Use")
if style and suffix:
return f"{style}; {suffix}"
return style or suffix
def _krea_row_field_dependencies() -> krea_row_fields.KreaRowFieldDependencies:
return krea_row_fields.KreaRowFieldDependencies(
clean=_clean,
row_value=_row_value,
camera_phrase=_camera_phrase,
camera_scene_phrase=_camera_scene_phrase,
style_phrase=_style_phrase,
expression_disabled=_expression_disabled,
)
def _krea_normal_row_dependencies() -> krea_normal_formatter.KreaNormalRowDependencies:
return krea_normal_formatter.KreaNormalRowDependencies(
clean=_clean,
row_value=_row_value,
age_subject=_age_subject,
age_detail_phrase=_age_detail_phrase,
appearance_phrase=_appearance_phrase,
with_indefinite_article=_with_indefinite_article,
paragraph=_paragraph,
)
def _krea_normal_row_request_from_row(
row: dict[str, Any],
detail_level: str,
style_mode: str,
) -> krea_normal_formatter.KreaNormalRowRequest:
fields = krea_row_fields.extract_krea_row_fields(
row,
style_mode,
_krea_row_field_dependencies(),
)
return krea_normal_formatter.KreaNormalRowRequest(
row=row,
detail_level=detail_level,
style_mode=style_mode,
subject_type=fields.subject_type,
primary=fields.primary,
item=fields.item,
scene=fields.scene,
pose=fields.pose,
expression=fields.expression,
composition=fields.composition,
camera=fields.camera,
camera_scene=fields.camera_scene,
style=fields.style,
)
def _krea_configured_cast_dependencies() -> krea_configured_cast_formatter.KreaConfiguredCastDependencies:
return krea_configured_cast_formatter.KreaConfiguredCastDependencies(
clean=_clean,
prompt_field=_prompt_field,
sanitize_hardcore_environment_anchors=_sanitize_hardcore_environment_anchors,
sanitize_hardcore_axis_values=_sanitize_hardcore_axis_values,
sanitize_scene_text_for_cast=_sanitize_scene_text_for_cast,
normalize_hardcore_detail_density=_normalize_hardcore_detail_density,
row_action_family=route_metadata_policy.row_action_family,
hardcore_action_sentence=_hardcore_action_sentence,
pov_action_phrase=_pov_action_phrase,
pov_labels_from_value=_pov_labels_from_value,
merge_labels=_merge_labels,
cast_prose_omit=lambda text, omit_labels: _cast_prose(text, omit_labels=omit_labels),
filter_pov_labeled_clauses=_filter_pov_labeled_clauses,
natural_label_text=_natural_label_text,
pov_composition_text=_pov_composition_text,
pov_camera_phrase=lambda labels: _pov_camera_phrase(labels),
expression_phrase=_expression_phrase,
composition_phrase=_composition_phrase,
paragraph=_paragraph,
)
def _krea_configured_cast_request(
row: dict[str, Any],
detail_level: str,
style_mode: str,
primary: str,
item: str,
scene: str,
expression: str,
composition: str,
source_composition: str,
camera: str,
camera_scene: str,
style: str,
) -> krea_configured_cast_formatter.KreaConfiguredCastRequest:
return krea_configured_cast_formatter.KreaConfiguredCastRequest(
row=row,
detail_level=detail_level,
style_mode=style_mode,
primary=primary,
item=item,
scene=scene,
expression=expression,
composition=composition,
source_composition=source_composition,
camera=camera,
camera_scene=camera_scene,
style=style,
)
def _krea_configured_cast_request_from_row(
row: dict[str, Any],
detail_level: str,
style_mode: str,
) -> krea_configured_cast_formatter.KreaConfiguredCastRequest:
fields = krea_row_fields.extract_krea_row_fields(
row,
style_mode,
_krea_row_field_dependencies(),
)
return _krea_configured_cast_request(
row,
detail_level,
style_mode,
fields.primary,
fields.item,
fields.scene,
fields.expression,
fields.composition,
fields.source_composition,
fields.camera,
fields.camera_scene,
fields.style,
)
def _normal_row_to_krea(row: dict[str, Any], detail_level: str, style_mode: str) -> tuple[str, str]:
subject_type = _clean(row.get("subject_type"))
if subject_type == "configured_cast" or _clean(row.get("cast_summary")):
return krea_configured_cast_formatter.format_configured_cast(
_krea_configured_cast_request_from_row(row, detail_level, style_mode),
_krea_configured_cast_dependencies(),
)
return krea_normal_formatter.format_normal_row(
_krea_normal_row_request_from_row(row, detail_level, style_mode),
_krea_normal_row_dependencies(),
)
def _krea_pair_format_dependencies() -> krea_pair_formatter.KreaPairFormatDependencies:
return krea_pair_formatter.KreaPairFormatDependencies(
clean=_clean,
prompt_cast_descriptors=_prompt_cast_descriptors,
pair_camera_phrase=_pair_camera_phrase,
camera_scene_phrase=_camera_scene_phrase,
style_phrase=_style_phrase,
sanitize_hardcore_environment_anchors=_sanitize_hardcore_environment_anchors,
sanitize_hardcore_axis_values=_sanitize_hardcore_axis_values,
sanitize_scene_text_for_cast=_sanitize_scene_text_for_cast,
normalize_hardcore_detail_density=_normalize_hardcore_detail_density,
row_action_family=route_metadata_policy.row_action_family,
hardcore_action_sentence=_hardcore_action_sentence,
pov_action_phrase=_pov_action_phrase,
pov_labels_from_value=_pov_labels_from_value,
merge_labels=_merge_labels,
cast_prose_omit=lambda text, omit_labels: _cast_prose(text, omit_labels=omit_labels),
label_join=_label_join,
filter_pov_labeled_clauses=_filter_pov_labeled_clauses,
natural_label_text=_natural_label_text,
expression_disabled=_expression_disabled,
expression_phrase=_expression_phrase,
pov_camera_phrase=lambda labels: _pov_camera_phrase(labels),
pov_soft_camera_phrase=lambda labels: _pov_camera_phrase(labels, softcore=True),
pov_composition_text=_pov_composition_text,
natural_clothing_state=_natural_clothing_state,
composition_phrase=_composition_phrase,
paragraph=_paragraph,
combine_negative=_combine_negative,
)
def _insta_pair_to_krea(row: dict[str, Any], detail_level: str, style_mode: str) -> tuple[str, str, str, str]:
return krea_pair_formatter.format_insta_pair(
krea_pair_formatter.KreaPairFormatRequest(
row=row,
detail_level=detail_level,
style_mode=style_mode,
),
_krea_pair_format_dependencies(),
)
def _fallback_text_to_krea(
source_text: str,
preserve_trigger: bool,
detail_level: str,
style_mode: str,
) -> tuple[str, str, str]:
positive, negative = _split_avoid(_strip_trigger(source_text, preserve_trigger))
positive = re.sub(r"\b(?:Scene|Setting):", "The setting is", positive)
positive = re.sub(r"\b(?:Pose|Sexual pose):", "The pose is", positive)
positive = re.sub(r"\bFacial expressions?:", "The facial expression is", positive)
positive = re.sub(r"\bComposition:", "The composition is", positive)
positive = re.sub(r"\bRole graph:", "The role choreography is", positive)
positive = re.sub(r"\bUse\b", "Use", positive)
positive = _clean(positive)
return _paragraph([positive]), negative, "text(fallback)"
def _krea_format_dependencies() -> krea_format_route.KreaFormatDependencies:
return krea_format_route.KreaFormatDependencies(
trigger_candidates=TRIGGER_CANDIDATES,
clean=_clean,
row_from_inputs=_row_from_inputs,
normal_row_to_krea=_normal_row_to_krea,
insta_pair_to_krea=_insta_pair_to_krea,
fallback_text_to_krea=_fallback_text_to_krea,
append_formatter_hints=_append_formatter_hints,
combine_negative=_combine_negative,
sanitize_prose_text=sanitize_prose_text,
sanitize_negative_text=sanitize_negative_text,
)
def format_krea2_prompt(
source_text: str,
metadata_json: str = "",
negative_prompt: str = "",
input_hint: str = "auto",
target: str = "auto",
detail_level: str = "balanced",
style_mode: str = "preserve",
preserve_trigger: bool = False,
extra_positive: str = "",
extra_negative: str = "",
) -> dict[str, str]:
return krea_format_route.format_krea2_prompt(
krea_format_route.KreaFormatRequest(
source_text=source_text,
metadata_json=metadata_json,
negative_prompt=negative_prompt,
input_hint=input_hint,
target=target,
detail_level=detail_level,
style_mode=style_mode,
preserve_trigger=preserve_trigger,
extra_positive=extra_positive,
extra_negative=extra_negative,
),
_krea_format_dependencies(),
)