from __future__ import annotations import re from typing import Any try: from . import formatter_input as input_policy from . import krea_format_route from . import route_metadata as route_metadata_policy from . import softcore_text_policy from .krea_action_context import ( is_close_foreplay_text as _is_close_foreplay_text, is_outercourse_text as _is_outercourse_text, normalize_hardcore_detail_density as _normalize_hardcore_detail_density, ) from . import krea_configured_cast_formatter from . import krea_normal_formatter from . import krea_pair_formatter from . import krea_row_fields from .hardcore_text_cleanup import ( sanitize_hardcore_axis_values as _sanitize_hardcore_axis_values, sanitize_hardcore_environment_anchors as _sanitize_hardcore_environment_anchors, ) from .krea_cast import ( cast_prose as _cast_prose, label_join as _label_join, lowercase_for_inline_join as _lowercase_for_inline_join, natural_label_text as _natural_label_text, prompt_cast_descriptors as _prompt_cast_descriptors, ) from .krea_clothing import natural_clothing_state as _natural_clothing_state from .krea_action_positions import action_position_phrase as _action_position_phrase from .krea_actions import hardcore_action_sentence as _hardcore_action_sentence from .krea_pov import ( filter_pov_labeled_clauses as _filter_pov_labeled_clauses, merge_labels as _merge_labels, pov_camera_phrase as _pov_camera_phrase, pov_composition_text as _pov_composition_text, pov_labels_from_value as _pov_labels_from_value, ) from .krea_pov_actions import pov_action_phrase as _pov_action_phrase from .prompt_hygiene import combine_negative_text, sanitize_negative_text, sanitize_prose_text except ImportError: # Allows local smoke tests with `python -c`. import formatter_input as input_policy import krea_format_route import route_metadata as route_metadata_policy import softcore_text_policy from krea_action_context import ( is_close_foreplay_text as _is_close_foreplay_text, is_outercourse_text as _is_outercourse_text, normalize_hardcore_detail_density as _normalize_hardcore_detail_density, ) import krea_configured_cast_formatter import krea_normal_formatter import krea_pair_formatter import krea_row_fields from hardcore_text_cleanup import ( sanitize_hardcore_axis_values as _sanitize_hardcore_axis_values, sanitize_hardcore_environment_anchors as _sanitize_hardcore_environment_anchors, ) from krea_cast import ( cast_prose as _cast_prose, label_join as _label_join, lowercase_for_inline_join as _lowercase_for_inline_join, natural_label_text as _natural_label_text, prompt_cast_descriptors as _prompt_cast_descriptors, ) from krea_clothing import natural_clothing_state as _natural_clothing_state from krea_action_positions import action_position_phrase as _action_position_phrase from krea_actions import hardcore_action_sentence as _hardcore_action_sentence from krea_pov import ( filter_pov_labeled_clauses as _filter_pov_labeled_clauses, merge_labels as _merge_labels, pov_camera_phrase as _pov_camera_phrase, pov_composition_text as _pov_composition_text, pov_labels_from_value as _pov_labels_from_value, ) from krea_pov_actions import pov_action_phrase as _pov_action_phrase from prompt_hygiene import combine_negative_text, sanitize_negative_text, sanitize_prose_text TRIGGER_CANDIDATES = ( "sxcpinup_coloredpencil", "sxcppnl7", ) PROMPT_FIELD_LABELS = input_policy.prompt_field_labels() def _clean(value: Any) -> str: return input_policy.clean_text(value) def _is_false(value: Any) -> bool: if isinstance(value, bool): return value is False if isinstance(value, str): return value.strip().lower() in ("false", "0", "no", "off") return False def _expression_disabled(row: dict[str, Any]) -> bool: return bool(row.get("expression_disabled")) or _is_false(row.get("expression_enabled", True)) def _sentence(text: str) -> str: text = _clean(text).strip(" ,;") if not text: return "" text = text[:1].upper() + text[1:] if text[-1] not in ".!?": text += "." return text def _paragraph(parts: list[str]) -> str: return " ".join(part for part in (_sentence(part) for part in parts) if part) def _formatter_hint_parts(*rows: dict[str, Any]) -> list[str]: hints: list[str] = [] for row in rows: if not isinstance(row, dict): continue for hint in route_metadata_policy.row_formatter_hints(row, "krea"): hint = _clean(hint).strip(" .") if hint and hint not in hints: hints.append(hint) return hints def _append_formatter_hints(prompt: str, *rows: dict[str, Any]) -> str: hints = _formatter_hint_parts(*rows) if not hints: return prompt return _paragraph([prompt, *hints]) def _with_indefinite_article(text: str) -> str: text = _clean(text) if not text or text.lower().startswith(("a ", "an ")): return text article = "an" if text[:1].lower() in "aeiou" else "a" return f"{article} {text}" def _maybe_json(text: str) -> dict[str, Any] | None: return input_policy.maybe_json(text) def _row_from_inputs(source_text: str, metadata_json: str, input_hint: str) -> tuple[dict[str, Any] | None, str]: return input_policy.row_from_inputs(source_text, metadata_json, input_hint) def _strip_trigger(text: str, preserve_trigger: bool) -> str: return input_policy.strip_trigger_prefix(text, TRIGGER_CANDIDATES, preserve_trigger=preserve_trigger) def _split_avoid(text: str) -> tuple[str, str]: return input_policy.split_avoid(text) def _prompt_field(text: str, label: str) -> str: return input_policy.prompt_field(text, label, field_labels=PROMPT_FIELD_LABELS) def _row_value(row: dict[str, Any], key: str, labels: tuple[str, ...] = ()) -> str: return input_policy.row_value(row, key, labels, field_labels=PROMPT_FIELD_LABELS) def _body_phrase(body: Any, figure_note: Any = "") -> str: body = _clean(body) figure_note = _clean(figure_note) if not body: return figure_note if not figure_note: return f"{body} figure" if "figure" in figure_note.lower(): return f"{body} build and {figure_note}" return f"{body} figure with {figure_note}" def _single_caption_front(row: dict[str, Any]) -> dict[str, str]: caption = _strip_trigger(_clean(row.get("caption")), False) if not caption: return {} subject = _clean(row.get("primary_subject")) age = _clean(row.get("age_band") or row.get("age")) body = _clean(row.get("body_phrase")) if not body: body_type = _clean(row.get("body_type") or row.get("body")) figure = _clean(row.get("figure")) body = _body_phrase(body_type, figure) front = f"{subject}, {age}, {body}, " if subject in ("woman", "man") and age and body and caption.startswith(front): try: skin, hair, eyes, _rest = caption[len(front) :].split(", ", 3) except ValueError: return {} return {"body_phrase": body, "skin": skin, "hair": hair, "eyes": eyes} return {} def _combine_negative(*parts: str) -> str: return combine_negative_text(*parts) def _sanitize_scene_text_for_cast(text: Any, labels: list[str]) -> str: text = _clean(text) if not text: return "" if len(labels) < 3: text = re.sub(r"\s*(?:while|as)\s+another partner watches\b", "", text, flags=re.IGNORECASE) text = re.sub(r"\banother partner watches\b", "", text, flags=re.IGNORECASE) text = re.sub(r",?\s*\bone partner held between two bodies\b", "", text, flags=re.IGNORECASE) text = re.sub(r",?\s*\bthree bodies locked together\b", "", text, flags=re.IGNORECASE) text = re.sub(r",?\s*\bthree bodies\b", "", text, flags=re.IGNORECASE) text = re.sub(r"\bwith\s*,\s*", "with ", text, flags=re.IGNORECASE) text = re.sub(r"\bwhile blowjob\b", "during a blowjob", text, flags=re.IGNORECASE) text = re.sub(r"\bfeaturing blowjob\b", "featuring a blowjob", text, flags=re.IGNORECASE) text = re.sub(r"\s+,", ",", text) text = re.sub(r",\s*,", ",", text) text = re.sub(r"\s{2,}", " ", text).strip(" ,") return text def _composition_phrase( composition: Any, action: str = "", prefix: str = "framed as", detail_density: str = "balanced", ) -> str: composition = _clean(composition) if not composition: return "" action_lower = _clean(action).lower() composition_lower = composition.lower() detail_density = _normalize_hardcore_detail_density(detail_density) if "first-person underview" in action_lower or "straddling the viewer's face" in action_lower: if any(token in composition_lower for token in ("mirror-reflected", "oral scene", "face and body visible")): return ( f"{prefix} close first-person underview with the woman's thighs framing the camera and the oral contact centered" ) if "pov viewer" in action_lower and any( token in action_lower for token in ("ass raised", "seen from behind", "on all fours", "bent forward", "face-down") ): return ( f"{prefix} first-person rear-view frame looking down at the woman's raised ass, with foreground hands and rear-entry contact readable" ) oral_pose_tokens = ( "kneeling oral", "side-lying oral", "spread-leg oral", "standing oral", "edge-of-bed oral", "face-sitting", "sixty-nine", "reclining cunnilingus", "straddled oral", "chair oral", ) if "oral" in action_lower: composition_oral_tokens = [token for token in oral_pose_tokens if token in composition_lower] if composition_oral_tokens and not any(token in action_lower for token in composition_oral_tokens): match = re.search(r"\bwith\s+(.+)$", composition, flags=re.IGNORECASE) return f"framed with {match.group(1)}" if match else "" if _is_outercourse_text(action_lower): return f"{prefix} {composition}" position = _action_position_phrase(action) close_or_aftermath = any( token in composition_lower for token in ("close-up", "close crop", "tight", "direct-flash", "subscriber-view", "post-ejaculation", "aftermath") ) if _is_close_foreplay_text(action_lower) and close_or_aftermath: return f"{prefix} {composition}, in one continuous first-person close-body frame" if position and close_or_aftermath: if detail_density == "compact": return f"{prefix} {composition}, with the {position} still readable" return f"{prefix} {composition}, keeping the {position} and action geography readable" return f"{prefix} {composition}" def _clean_age(age: Any) -> str: return _clean(age) def _age_detail_phrase(age: Any) -> str: text = _clean(age) text = re.sub(r"\s+adults?$", "", text).strip() return text.replace("-year-old", " years old") def _age_subject(row: dict[str, Any], fallback_subject: str = "adult person") -> str: subject = _clean(row.get("subject_phrase") or row.get("primary_subject") or row.get("subject") or fallback_subject) age = _clean_age(row.get("age_band") or row.get("age")) if row.get("subject_type") == "configured_cast": return _clean(row.get("subject_phrase") or subject) if subject in ("woman", "man"): if age: return f"{age} {subject}" if "adult" in age.lower() else f"{age} adult {subject}" return f"adult {subject}" if age and "adult" not in subject.lower(): return f"{age} {subject}" return subject or fallback_subject def _appearance_phrase(row: dict[str, Any]) -> str: front = _single_caption_front(row) parts = [ _row_value(row, "body_phrase") or front.get("body_phrase"), _row_value(row, "skin") or front.get("skin"), _row_value(row, "hair") or front.get("hair"), _row_value(row, "eyes") or front.get("eyes"), ] return ", ".join(_clean(part) for part in parts if _clean(part)) def _expression_phrase(expression: Any) -> str: expression = _clean(expression) if not expression: return "" if ";" in expression or re.search( r"\b(?:Woman|Man) [A-Z] has\b|\bthe (?:woman|man) has\b", expression, flags=re.IGNORECASE, ): return f"Expressions: {expression}" return f"with {expression}" def _camera_phrase(row: dict[str, Any]) -> str: directive = _clean(row.get("camera_directive")) if directive: return directive config = row.get("camera_config") if isinstance(config, dict): detail = _clean(config.get("camera_detail")) if detail == "off" or _clean(config.get("camera_mode")) == "disabled": return "" custom = _clean(config.get("custom_camera_prompt")) if custom: base = _clean(config.get("camera_mode")).replace("_", " ") pieces = [piece for piece in (base, custom) if piece and piece != "standard"] return "Camera: " + ", ".join(pieces) mode = _clean(config.get("camera_mode")).replace("_", " ") shot = _clean(config.get("shot_size")).replace("_", " ") angle = _clean(config.get("angle")).replace("_", " ") pieces = [piece for piece in (mode, shot, angle) if piece and piece != "auto" and piece != "standard"] if pieces: return "Camera: " + ", ".join(pieces) return "" def _camera_scene_phrase(row: dict[str, Any]) -> str: return _clean(row.get("camera_scene_directive")) def _camera_phrase_from_config(config: Any) -> str: if not isinstance(config, dict): return "" detail = _clean(config.get("camera_detail")) if detail == "off" or _clean(config.get("camera_mode")) == "disabled": return "" custom = _clean(config.get("custom_camera_prompt")) if custom: base = _clean(config.get("camera_mode")).replace("_", " ") pieces = [piece for piece in (base, custom) if piece and piece != "standard"] return "Camera: " + ", ".join(pieces) values = [ _clean(config.get("camera_mode")).replace("_", " "), _clean(config.get("shot_size")).replace("_", " "), _clean(config.get("angle")).replace("_", " "), _clean(config.get("lens")).replace("_", " "), _clean(config.get("distance")).replace("_", " "), _clean(config.get("orientation")).replace("_", " "), _clean(config.get("phone_visibility")).replace("_", " "), ] pieces = [value for value in values if value and value not in ("auto", "standard")] if not pieces: return "" return "Camera: " + ", ".join(pieces) def _pair_camera_phrase(directive: Any, config: Any, row: dict[str, Any]) -> str: directive_text = _clean(directive) if directive_text: return directive_text if isinstance(config, dict) and ( _clean(config.get("camera_detail")) == "off" or _clean(config.get("camera_mode")) == "disabled" ): return "" return _camera_phrase_from_config(config) or _camera_phrase(row) def _style_phrase(row: dict[str, Any], style_mode: str) -> str: if style_mode == "minimal": return "" if style_mode == "photographic": return "realistic creator-shot photography with natural lighting, tactile skin and fabric detail, and clean social-media composition" style = _clean(row.get("style")) suffix = _clean(row.get("positive_suffix")) or _prompt_field(_clean(row.get("prompt")), "Use") if style and suffix: return f"{style}; {suffix}" return style or suffix def _krea_row_field_dependencies() -> krea_row_fields.KreaRowFieldDependencies: return krea_row_fields.KreaRowFieldDependencies( clean=_clean, row_value=_row_value, camera_phrase=_camera_phrase, camera_scene_phrase=_camera_scene_phrase, style_phrase=_style_phrase, expression_disabled=_expression_disabled, ) def _krea_normal_row_dependencies() -> krea_normal_formatter.KreaNormalRowDependencies: return krea_normal_formatter.KreaNormalRowDependencies( clean=_clean, row_value=_row_value, age_subject=_age_subject, age_detail_phrase=_age_detail_phrase, appearance_phrase=_appearance_phrase, with_indefinite_article=_with_indefinite_article, paragraph=_paragraph, ) def _krea_normal_row_request_from_row( row: dict[str, Any], detail_level: str, style_mode: str, ) -> krea_normal_formatter.KreaNormalRowRequest: fields = krea_row_fields.extract_krea_row_fields( row, style_mode, _krea_row_field_dependencies(), ) return krea_normal_formatter.KreaNormalRowRequest( row=row, detail_level=detail_level, style_mode=style_mode, subject_type=fields.subject_type, primary=fields.primary, item=fields.item, scene=fields.scene, pose=fields.pose, expression=fields.expression, composition=fields.composition, camera=fields.camera, camera_scene=fields.camera_scene, style=fields.style, ) def _krea_configured_cast_dependencies() -> krea_configured_cast_formatter.KreaConfiguredCastDependencies: return krea_configured_cast_formatter.KreaConfiguredCastDependencies( clean=_clean, sanitize_hardcore_environment_anchors=_sanitize_hardcore_environment_anchors, sanitize_hardcore_axis_values=_sanitize_hardcore_axis_values, sanitize_scene_text_for_cast=_sanitize_scene_text_for_cast, normalize_hardcore_detail_density=_normalize_hardcore_detail_density, row_action_family=route_metadata_policy.row_action_family, hardcore_action_sentence=_hardcore_action_sentence, pov_action_phrase=_pov_action_phrase, pov_labels_from_value=_pov_labels_from_value, merge_labels=_merge_labels, cast_prose_omit=lambda text, omit_labels: _cast_prose(text, omit_labels=omit_labels), filter_pov_labeled_clauses=_filter_pov_labeled_clauses, natural_label_text=_natural_label_text, pov_composition_text=_pov_composition_text, pov_camera_phrase=lambda labels: _pov_camera_phrase(labels), expression_phrase=_expression_phrase, composition_phrase=_composition_phrase, paragraph=_paragraph, ) def _krea_configured_cast_request( row: dict[str, Any], detail_level: str, style_mode: str, primary: str, item: str, scene: str, expression: str, composition: str, source_composition: str, camera: str, camera_scene: str, style: str, ) -> krea_configured_cast_formatter.KreaConfiguredCastRequest: return krea_configured_cast_formatter.KreaConfiguredCastRequest( row=row, detail_level=detail_level, style_mode=style_mode, primary=primary, item=item, scene=scene, expression=expression, composition=composition, source_composition=source_composition, camera=camera, camera_scene=camera_scene, style=style, ) def _krea_configured_cast_request_from_row( row: dict[str, Any], detail_level: str, style_mode: str, ) -> krea_configured_cast_formatter.KreaConfiguredCastRequest: fields = krea_row_fields.extract_krea_row_fields( row, style_mode, _krea_row_field_dependencies(), ) return _krea_configured_cast_request( row, detail_level, style_mode, fields.primary, fields.item, fields.scene, fields.expression, fields.composition, fields.source_composition, fields.camera, fields.camera_scene, fields.style, ) def _normal_row_to_krea(row: dict[str, Any], detail_level: str, style_mode: str) -> tuple[str, str]: subject_type = _clean(row.get("subject_type")) if subject_type == "configured_cast" or _clean(row.get("cast_summary")): return krea_configured_cast_formatter.format_configured_cast( _krea_configured_cast_request_from_row(row, detail_level, style_mode), _krea_configured_cast_dependencies(), ) return krea_normal_formatter.format_normal_row( _krea_normal_row_request_from_row(row, detail_level, style_mode), _krea_normal_row_dependencies(), ) def _krea_pair_format_dependencies() -> krea_pair_formatter.KreaPairFormatDependencies: return krea_pair_formatter.KreaPairFormatDependencies( clean=_clean, prompt_cast_descriptors=_prompt_cast_descriptors, pair_camera_phrase=_pair_camera_phrase, camera_scene_phrase=_camera_scene_phrase, style_phrase=_style_phrase, sanitize_hardcore_environment_anchors=_sanitize_hardcore_environment_anchors, sanitize_hardcore_axis_values=_sanitize_hardcore_axis_values, sanitize_scene_text_for_cast=_sanitize_scene_text_for_cast, normalize_hardcore_detail_density=_normalize_hardcore_detail_density, row_action_family=route_metadata_policy.row_action_family, hardcore_action_sentence=_hardcore_action_sentence, pov_action_phrase=_pov_action_phrase, pov_labels_from_value=_pov_labels_from_value, merge_labels=_merge_labels, cast_prose_omit=lambda text, omit_labels: _cast_prose(text, omit_labels=omit_labels), label_join=_label_join, filter_pov_labeled_clauses=_filter_pov_labeled_clauses, natural_label_text=_natural_label_text, expression_disabled=_expression_disabled, expression_phrase=_expression_phrase, pov_camera_phrase=lambda labels: _pov_camera_phrase(labels), pov_soft_camera_phrase=lambda labels: _pov_camera_phrase(labels, softcore=True), pov_composition_text=_pov_composition_text, softcore_cast_presence_phrase=softcore_text_policy.softcore_cast_presence_phrase, natural_clothing_state=_natural_clothing_state, composition_phrase=_composition_phrase, paragraph=_paragraph, combine_negative=_combine_negative, ) def _insta_pair_to_krea(row: dict[str, Any], detail_level: str, style_mode: str) -> tuple[str, str, str, str]: return krea_pair_formatter.format_insta_pair( krea_pair_formatter.KreaPairFormatRequest( row=row, detail_level=detail_level, style_mode=style_mode, ), _krea_pair_format_dependencies(), ) def _fallback_text_to_krea( source_text: str, preserve_trigger: bool, detail_level: str, style_mode: str, ) -> tuple[str, str, str]: positive, negative = _split_avoid(_strip_trigger(source_text, preserve_trigger)) positive = re.sub(r"\b(?:Scene|Setting):", "The setting is", positive) positive = re.sub(r"\b(?:Pose|Sexual pose):", "The pose is", positive) positive = re.sub(r"\bFacial expressions?:", "The facial expression is", positive) positive = re.sub(r"\bComposition:", "The composition is", positive) positive = re.sub(r"\bRole graph:", "The role choreography is", positive) positive = re.sub(r"\bUse\b", "Use", positive) positive = _clean(positive) return _paragraph([positive]), negative, "text(fallback)" def _krea_format_dependencies() -> krea_format_route.KreaFormatDependencies: return krea_format_route.KreaFormatDependencies( trigger_candidates=TRIGGER_CANDIDATES, clean=_clean, row_from_inputs=_row_from_inputs, normal_row_to_krea=_normal_row_to_krea, insta_pair_to_krea=_insta_pair_to_krea, fallback_text_to_krea=_fallback_text_to_krea, append_formatter_hints=_append_formatter_hints, combine_negative=_combine_negative, sanitize_prose_text=sanitize_prose_text, sanitize_negative_text=sanitize_negative_text, ) def format_krea2_prompt( source_text: str, metadata_json: str = "", negative_prompt: str = "", input_hint: str = "auto", target: str = "auto", detail_level: str = "balanced", style_mode: str = "preserve", preserve_trigger: bool = False, extra_positive: str = "", extra_negative: str = "", ) -> dict[str, str]: return krea_format_route.format_krea2_prompt( krea_format_route.KreaFormatRequest( source_text=source_text, metadata_json=metadata_json, negative_prompt=negative_prompt, input_hint=input_hint, target=target, detail_level=detail_level, style_mode=style_mode, preserve_trigger=preserve_trigger, extra_positive=extra_positive, extra_negative=extra_negative, ), _krea_format_dependencies(), )