diff --git a/docs/prompt-architecture-improvement-plan.md b/docs/prompt-architecture-improvement-plan.md index 8d3aa74..7032a18 100644 --- a/docs/prompt-architecture-improvement-plan.md +++ b/docs/prompt-architecture-improvement-plan.md @@ -126,6 +126,9 @@ Already isolated: - object-style item-template metadata extraction, action/position family normalization, position-key normalization, and metadata audit errors live in `category_template_metadata.py`. +- row item selection, weighted item/pair choice, item-template axis filling, + and oral/outercourse axis compatibility filters live in `row_item.py`; + `prompt_builder.py` keeps public delegate wrappers. - category/cast route preset schemas, config JSON builders, choice lists, and parsers live in `category_cast_config.py`; `prompt_builder.py` keeps public delegate wrappers for existing nodes and tests. diff --git a/docs/prompt-pool-routing-map.md b/docs/prompt-pool-routing-map.md index e750cf0..5ce6636 100644 --- a/docs/prompt-pool-routing-map.md +++ b/docs/prompt-pool-routing-map.md @@ -69,6 +69,7 @@ Core helper ownership: | --- | --- | | `category_library.py` | JSON category loading, subcategory normalization, named scene/expression/composition pool loading, cast compatibility filtering, exact subcategory lookup, and inheritance-based pool merging. | | `category_template_metadata.py` | Object-style item-template metadata extraction, action/position family normalization, position-key normalization, key merging, and audit validation errors. | +| `row_item.py` | Row item selection, weighted item/pair choice, item-template axis filling, and oral/outercourse axis compatibility filters. | | `category_cast_config.py` | Category preset and cast preset schemas, category/cast config JSON builders, choice lists, and config parsers used by route nodes. | | `cast_context.py` | Generation-time cast count phrases, configured-cast context metadata, character-slot label assignment, cast-summary wording, scene-kind labels, and couple count normalization. | | `camera_config.py` | Camera option schema, direct/orbit/Qwen camera JSON builders, camera config parsing, plain camera directive text, and camera caption labels. | @@ -209,7 +210,7 @@ there unless the behavior needs Python logic. flowchart TD A[category/subcategory input] --> B[_find_subcategory] B --> C[item from subcategory.items] - C --> D[_compose_item] + C --> D[row_item.compose_item] D --> E[item_templates + axes] B --> F[_scene_pool] B --> G[_expression_pool] @@ -466,9 +467,9 @@ plain prompt text. When debugging, inspect these fields before editing pools. | `main_category`, `subcategory` | Category selection | All formatters and debug | Human-readable selected category route. | | `category_slug`, `subcategory_slug` | JSON category normalization | Debug/filtering | Stable-ish machine labels for selected category route. | | `content_seed_axis` | `_build_custom_row` | Debug | Shows whether the item/action was driven by `content` or `pose`. Critical for hardcore pose categories. | -| `item` | `_compose_item` or Insta override | Krea/SDXL/Naturalizer | Clothing item, category item, or sexual scene/action text. | -| `item_axis_values` | `_compose_item` | Krea hardcore rewrite, SDXL tags | Filled template axes such as position/action/detail values. | -| `item_template_metadata` | `_compose_item` | Debug, Krea/SDXL/Naturalizer route metadata | Optional metadata from object-style item templates; currently used to prefer explicit action/position families and keys before inference. | +| `item` | `row_item.compose_item` or Insta override | Krea/SDXL/Naturalizer | Clothing item, category item, or sexual scene/action text. | +| `item_axis_values` | `row_item.compose_item` | Krea hardcore rewrite, SDXL tags | Filled template axes such as position/action/detail values. | +| `item_template_metadata` | `row_item.compose_item` | Debug, Krea/SDXL/Naturalizer route metadata | Optional metadata from object-style item templates; currently used to prefer explicit action/position families and keys before inference. | | `formatter_hints` | `category_template_metadata.formatter_hints` | Krea/SDXL/Naturalizer route specialization, debug | Normalized route-specific hints from object-style item templates, keyed by `all`, `krea`, `sdxl`, or `caption`; each formatter consumes `all` plus its own route only. | | `action_family` | `item_template_metadata` or `hardcore_action_metadata.source_hardcore_action_family` | Krea hardcore rewrite, SDXL tags, natural captions, debug | Source-aware formatter semantic family such as `foreplay`, `outercourse`, `oral`, `penetration`, `toy_double`, or `climax`. | | `position_family` | `item_template_metadata` or `_hardcore_source_position_family` | Debug/filtering | Source/UI hardcore family selected by template metadata or subcategory, such as `manual`, `interaction`, `oral`, `anal`, or `climax`. | diff --git a/prompt_builder.py b/prompt_builder.py index f5af664..b23fa6d 100644 --- a/prompt_builder.py +++ b/prompt_builder.py @@ -5,7 +5,7 @@ import random import re from pathlib import Path from string import Formatter -from typing import Any, Callable +from typing import Any try: from .category_library import ( @@ -14,10 +14,8 @@ try: compatible_entry as _compatible_entry, find_subcategory as _find_subcategory, load_category_library, - merged_axes as _merged_axes, merged_field as _merged_field, read_category_json as _read_json, - template_list as _template_list, ) from . import camera_config as camera_policy from . import cast_context as cast_context_policy @@ -42,6 +40,7 @@ try: from . import row_normalization as row_policy from . import row_camera as row_camera_policy from . import row_expression as row_expression_policy + from . import row_item as row_item_policy from . import row_location as row_location_policy from . import row_pools as row_pool_policy from . import seed_config as seed_policy @@ -59,10 +58,8 @@ except ImportError: # Allows local smoke tests with `python -c`. compatible_entry as _compatible_entry, find_subcategory as _find_subcategory, load_category_library, - merged_axes as _merged_axes, merged_field as _merged_field, read_category_json as _read_json, - template_list as _template_list, ) import camera_config as camera_policy import cast_context as cast_context_policy @@ -87,6 +84,7 @@ except ImportError: # Allows local smoke tests with `python -c`. import row_normalization as row_policy import row_camera as row_camera_policy import row_expression as row_expression_policy + import row_item as row_item_policy import row_location as row_location_policy import row_pools as row_pool_policy import seed_config as seed_policy @@ -206,7 +204,7 @@ class SafeFormatDict(dict): def _slug(value: str) -> str: - return g.slugify(value) or "custom" + return row_item_policy.slug(value) def _list_from(value: Any) -> list[Any]: @@ -243,69 +241,23 @@ def _unique_extend(target: list[Any], additions: list[Any]) -> None: def _pair_from(value: Any) -> tuple[str, str]: - if isinstance(value, dict): - text = str( - value.get("prompt") - or value.get("description") - or value.get("text") - or value.get("name") - or "" - ).strip() - slug = str(value.get("slug") or _slug(str(value.get("name") or text))).strip() - if not text: - raise ValueError(f"Pair extension is missing prompt text: {value!r}") - return slug, text - if isinstance(value, (list, tuple)) and len(value) == 2: - return str(value[0]), str(value[1]) - text = str(value).strip() - if not text: - raise ValueError("Pair extension cannot be empty") - return _slug(text), text + return row_item_policy.pair_from(value) def _weighted_choice(rng: random.Random, items: list[Any]) -> Any: - if not items: - raise ValueError("Cannot choose from an empty list") - weights: list[float] = [] - for item in items: - weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0 - try: - weights.append(max(0.0, float(weight))) - except (TypeError, ValueError): - weights.append(1.0) - total = sum(weights) - if total <= 0: - return items[rng.randrange(len(items))] - pick = rng.random() * total - running = 0.0 - for item, weight in zip(items, weights): - running += weight - if pick <= running: - return item - return items[-1] + return row_item_policy.weighted_choice(rng, items) def _entry_text(item: Any) -> str: - if isinstance(item, dict): - return str( - item.get("template") - or item.get("prompt") - or item.get("text") - or item.get("description") - or item.get("name") - or "" - ).strip() - return str(item).strip() + return row_item_policy.entry_text(item) def _item_text(item: Any) -> str: - return _entry_text(item) + return row_item_policy.item_text(item) def _item_name(item: Any) -> str: - if isinstance(item, dict): - return str(item.get("name") or _item_text(item)).strip() - return _item_text(item) + return row_item_policy.item_name(item) def _template_metadata(item: Any) -> dict[str, Any]: @@ -333,199 +285,19 @@ def _merge_position_keys(primary: list[str], fallback: list[str]) -> list[str]: def _oral_acts_for_position(values: list[Any], position: str) -> list[Any]: - position_text = str(position or "").lower() - if not position_text: - return values - - def act_text(value: Any) -> str: - return _entry_text(value).lower() - - def filtered(predicate: Callable[[str], bool]) -> list[Any]: - matches = [value for value in values if predicate(act_text(value))] - return matches or values - - penis_terms = ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth") - cunnilingus_terms = ("cunnilingus", "pussy licking", "tongue on pussy", "oral sex with tongue and fingers", "mouth on genitals") - if "sixty-nine" in position_text: - return filtered(lambda text: "sixty-nine" in text) - if "face-sitting" in position_text: - return filtered(lambda text: "face-sitting" in text or any(term in text for term in cunnilingus_terms)) - if "kneeling oral" in position_text: - return filtered(lambda text: any(term in text for term in penis_terms)) - if "straddled oral" in position_text or "reclining cunnilingus" in position_text: - return filtered(lambda text: "sixty-nine" not in text and not any(term in text for term in penis_terms)) - if "spread-leg oral" in position_text: - return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text) - if any(term in position_text for term in ("standing oral", "kneeling oral", "edge-of-bed oral", "chair oral", "side-lying oral")): - return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text) - return values + return row_item_policy.oral_acts_for_position(values, position) def _oral_axis_values_for_context(values: list[Any], position: str, oral_act: str, axis_name: str) -> list[Any]: - axis_name = str(axis_name or "").lower() - if axis_name not in {"body_contact", "hand_detail", "mouth_detail", "saliva_detail", "climax_hint", "visibility"}: - return values - position_text = str(position or "").lower() - act_text = str(oral_act or "").lower() - woman_gives = any( - term in act_text - for term in ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth") - ) - man_gives = any( - term in act_text - for term in ("cunnilingus", "pussy licking", "tongue on pussy") - ) - if not (woman_gives or man_gives): - return values - - def value_text(value: Any) -> str: - return _entry_text(value).lower() - - def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]: - matches = [ - value - for value in values - if any(term in value_text(value) for term in terms) - and not any(term in value_text(value) for term in excluded_terms) - ] - return matches or values - - if woman_gives: - by_axis = { - "body_contact": ("hips pushed", "fingers tangled", "bodies stacked", "hands on thighs"), - "hand_detail": ("hips", "penis", "head", "hair"), - "mouth_detail": ("lips", "mouth", "deep mouth", "saliva"), - "saliva_detail": ("saliva", "wet lips", "slick wet mouth", "drool", "mouth"), - "climax_hint": ("mouth", "lips", "tongue", "breasts", "belly", "sexual fluids"), - "visibility": ("mouth", "penis", "oral"), - } - excluded = { - "body_contact": ("legs held open", "spread legs", "ass lifted", "chest pressed to thighs"), - "hand_detail": ("spreading thighs", "sheets", "cupping breasts", "pressing into thighs", "holding the ass"), - } - return filtered(by_axis.get(axis_name, ("mouth", "penis")), excluded.get(axis_name, ())) - if man_gives and ("kneeling oral" in position_text or "standing oral" in position_text): - by_axis = { - "body_contact": ("legs held open", "one body kneeling", "chest pressed", "ass lifted", "hands on thighs"), - "hand_detail": ("thigh", "hips", "head", "ass"), - "mouth_detail": ("tongue", "wet lips", "deep mouth", "genitals"), - "saliva_detail": ("saliva", "wet lips", "tongue", "drool"), - "climax_hint": ("sexual fluids", "orgasmic tension"), - "visibility": ("mouth", "pussy", "oral", "genital"), - } - return filtered(by_axis.get(axis_name, ("mouth", "pussy", "tongue")), ("penis", "breasts")) - return values + return row_item_policy.oral_axis_values_for_context(values, position, oral_act, axis_name) def _outercourse_acts_for_position(values: list[Any], position: str) -> list[Any]: - position_text = str(position or "").lower() - if not position_text: - return values - - def act_text(value: Any) -> str: - return _entry_text(value).lower() - - def filtered(predicate: Callable[[str], bool]) -> list[Any]: - matches = [value for value in values if predicate(act_text(value))] - return matches or values - - if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")): - return filtered(lambda text: any(term in text for term in ("boobjob", "titjob", "breast sex", "breasts"))) - if any(term in position_text for term in ("testicle", "balls")): - return filtered(lambda text: any(term in text for term in ("testicle", "balls"))) - if "penis-licking" in position_text or "penis licking" in position_text: - return filtered(lambda text: "licking" in text or "tongue" in text) - if "handjob" in position_text or "hand job" in position_text: - return filtered(lambda text: any(term in text for term in ("handjob", "hand job", "hand wrapped", "two-handed"))) - if "footjob" in position_text: - return filtered(lambda text: any(term in text for term in ("footjob", "feet", "soles", "toes"))) - return values + return row_item_policy.outercourse_acts_for_position(values, position) def _outercourse_axis_values_for_position(values: list[Any], position: str, axis_name: str) -> list[Any]: - position_text = str(position or "").lower() - if not position_text: - return values - axis_name = str(axis_name or "").lower() - if axis_name not in {"contact_detail", "hand_detail", "texture_detail", "visibility", "body_contact"}: - return values - - def value_text(value: Any) -> str: - return _entry_text(value).lower() - - def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]: - matches = [ - value - for value in values - if any(term in value_text(value) for term in terms) - and not any(term in value_text(value) for term in excluded_terms) - ] - return matches or values - - if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")): - by_axis = { - "contact_detail": ("compressed", "glans", "breast", "breasts", "soft tissue", "skin visibly"), - "hand_detail": ("breast", "breasts", "fingers"), - "texture_detail": ("compression", "soft flesh", "skin", "flesh", "asymmetry"), - "visibility": ("breast", "breasts", "glans", "shaft"), - "body_contact": ("torso", "body angled", "shoulders", "hips"), - } - excluded_by_axis = { - "contact_detail": ("hand wrapped", "fingers and palm", "soles", "toes", "balls", "tongue"), - "hand_detail": ("base of the penis", "penis shaft", "balls", "thigh", "ankles", "stroking"), - "texture_detail": ("toes", "soles", "tongue"), - "visibility": ("balls", "soles", "toes", "hand"), - "body_contact": ("head tucked", "face directly", "base of the penis"), - } - return filtered( - by_axis.get(axis_name, ("breast", "breasts", "shaft")), - excluded_by_axis.get(axis_name, ()), - ) - if any(term in position_text for term in ("testicle", "balls")): - by_axis = { - "contact_detail": ("balls", "lips", "tongue", "wet"), - "hand_detail": ("balls", "base", "thigh"), - "texture_detail": ("wet", "saliva", "skin"), - "visibility": ("balls", "mouth"), - "body_contact": ("torso", "shoulders", "head tucked", "base of the penis", "knees", "thigh"), - } - return filtered(by_axis.get(axis_name, ("balls", "mouth", "tongue"))) - if "penis-licking" in position_text or "penis licking" in position_text: - by_axis = { - "contact_detail": ("tongue", "lips", "glans", "shaft", "wet"), - "hand_detail": ("base", "penis", "thigh"), - "texture_detail": ("wet", "saliva", "skin"), - "visibility": ("tongue", "penis"), - "body_contact": ("head low", "face directly", "torso", "pelvis", "base of the penis", "hips", "body angled"), - } - return filtered(by_axis.get(axis_name, ("tongue", "glans", "shaft"))) - if "handjob" in position_text or "hand job" in position_text: - by_axis = { - "contact_detail": ("hand", "fingers", "palm", "shaft", "glans"), - "hand_detail": ("hand", "hands", "shaft", "penis"), - "texture_detail": ("fingers", "pressure", "skin", "shaft"), - "visibility": ("hand", "penis", "shaft", "glans"), - "body_contact": ("hips", "knees", "body angle"), - } - return filtered(by_axis.get(axis_name, ("hand", "penis", "shaft"))) - if "footjob" in position_text: - by_axis = { - "contact_detail": ("soles", "toes"), - "hand_detail": ("ankles", "thighs"), - "texture_detail": ("toes", "soles", "pressure"), - "visibility": ("feet", "soles"), - "body_contact": ("legs", "knees", "body angled"), - } - excluded_by_axis = { - "contact_detail": ("hand", "finger", "palm", "balls", "tongue", "breast"), - "texture_detail": ("fingers", "tongue", "breast"), - "visibility": ("hand", "balls", "breast"), - } - return filtered( - by_axis.get(axis_name, ("feet", "soles", "toes")), - excluded_by_axis.get(axis_name, ()), - ) - return values + return row_item_policy.outercourse_axis_values_for_position(values, position, axis_name) def _compose_item( @@ -536,57 +308,26 @@ def _compose_item( women_count: int = 1, men_count: int = 1, ) -> tuple[str, str, dict[str, str], dict[str, Any]]: - templates = _template_list(category, subcategory, item, "item_templates") - axes = _merged_axes(category, subcategory, item) - if templates and axes: - template_entry = _weighted_choice(rng, _compatible_entries(templates, women_count, men_count)) - template = _entry_text(template_entry) - fields = [key for _, key, _, _ in Formatter().parse(template) if key] - unique_fields = list(dict.fromkeys(fields)) - axis_values: dict[str, str] = {} - subcategory_slug = str(subcategory.get("slug") or "").lower() - if subcategory_slug in ("oral_sex", "outercourse_sex") and "position" in unique_fields and axes.get("position"): - position_values = _compatible_entries(axes["position"], women_count, men_count) - axis_values["position"] = _entry_text(_weighted_choice(rng, position_values)) - for name in unique_fields: - if name in axis_values or name not in axes or not axes[name]: - continue - values = _compatible_entries(axes[name], women_count, men_count) - if subcategory_slug == "oral_sex" and name == "oral_act": - values = _oral_acts_for_position(values, axis_values.get("position", "")) - elif subcategory_slug == "oral_sex": - values = _oral_axis_values_for_context( - values, - axis_values.get("position", ""), - axis_values.get("oral_act", ""), - name, - ) - if subcategory_slug == "outercourse_sex" and name == "outer_act": - values = _outercourse_acts_for_position(values, axis_values.get("position", "")) - if subcategory_slug == "outercourse_sex": - values = _outercourse_axis_values_for_position(values, axis_values.get("position", ""), name) - axis_values[name] = _entry_text(_weighted_choice(rng, values)) - item_text = _format(template, axis_values).strip() - item_name = _item_name(item) or subcategory["name"] - return item_text, item_name, axis_values, _template_metadata(template_entry) - return _item_text(item), _item_name(item), {}, _template_metadata(item) + return row_item_policy.compose_item( + rng, + category, + subcategory, + item, + women_count, + men_count, + ) def _choose_text(rng: random.Random, items: list[Any]) -> str: - item = _weighted_choice(rng, items) - return _item_text(item) + return row_item_policy.choose_text(rng, items) def _choose_distinct_text(rng: random.Random, items: list[Any], first_text: str) -> str: - first_text = _item_text(first_text).lower() - distinct = [item for item in items if _item_text(item).lower() != first_text] - if not distinct: - return "" - return _choose_text(rng, distinct) + return row_item_policy.choose_distinct_text(rng, items, first_text) def _choose_pair(rng: random.Random, items: list[Any]) -> tuple[str, str]: - return _pair_from(_weighted_choice(rng, items)) + return row_item_policy.choose_pair(rng, items) def _extension_targets() -> dict[str, tuple[list[Any], bool]]: diff --git a/row_item.py b/row_item.py new file mode 100644 index 0000000..e7d2b5d --- /dev/null +++ b/row_item.py @@ -0,0 +1,343 @@ +from __future__ import annotations + +import random +from string import Formatter +from typing import Any, Callable + +try: + from . import category_library as category_policy + from . import category_template_metadata as template_policy + from . import generate_prompt_batches as g +except ImportError: # Allows local smoke tests with top-level imports. + import category_library as category_policy + import category_template_metadata as template_policy + import generate_prompt_batches as g + + +class SafeFormatDict(dict): + def __missing__(self, key: str) -> str: + return "{" + key + "}" + + +def slug(value: str) -> str: + return g.slugify(value) or "custom" + + +def pair_from(value: Any) -> tuple[str, str]: + if isinstance(value, dict): + text = str( + value.get("prompt") + or value.get("description") + or value.get("text") + or value.get("name") + or "" + ).strip() + pair_slug = str(value.get("slug") or slug(str(value.get("name") or text))).strip() + if not text: + raise ValueError(f"Pair extension is missing prompt text: {value!r}") + return pair_slug, text + if isinstance(value, (list, tuple)) and len(value) == 2: + return str(value[0]), str(value[1]) + text = str(value).strip() + if not text: + raise ValueError("Pair extension cannot be empty") + return slug(text), text + + +def weighted_choice(rng: random.Random, items: list[Any]) -> Any: + if not items: + raise ValueError("Cannot choose from an empty list") + weights: list[float] = [] + for item in items: + weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0 + try: + weights.append(max(0.0, float(weight))) + except (TypeError, ValueError): + weights.append(1.0) + total = sum(weights) + if total <= 0: + return items[rng.randrange(len(items))] + pick = rng.random() * total + running = 0.0 + for item, weight in zip(items, weights): + running += weight + if pick <= running: + return item + return items[-1] + + +def entry_text(item: Any) -> str: + return category_policy._entry_text(item) + + +def item_text(item: Any) -> str: + return entry_text(item) + + +def item_name(item: Any) -> str: + if isinstance(item, dict): + return str(item.get("name") or item_text(item)).strip() + return item_text(item) + + +def choose_text(rng: random.Random, items: list[Any]) -> str: + return item_text(weighted_choice(rng, items)) + + +def choose_distinct_text(rng: random.Random, items: list[Any], first_text: str) -> str: + first_text = item_text(first_text).lower() + distinct = [item for item in items if item_text(item).lower() != first_text] + if not distinct: + return "" + return choose_text(rng, distinct) + + +def choose_pair(rng: random.Random, items: list[Any]) -> tuple[str, str]: + return pair_from(weighted_choice(rng, items)) + + +def oral_acts_for_position(values: list[Any], position: str) -> list[Any]: + position_text = str(position or "").lower() + if not position_text: + return values + + def act_text(value: Any) -> str: + return entry_text(value).lower() + + def filtered(predicate: Callable[[str], bool]) -> list[Any]: + matches = [value for value in values if predicate(act_text(value))] + return matches or values + + penis_terms = ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth") + cunnilingus_terms = ("cunnilingus", "pussy licking", "tongue on pussy", "oral sex with tongue and fingers", "mouth on genitals") + if "sixty-nine" in position_text: + return filtered(lambda text: "sixty-nine" in text) + if "face-sitting" in position_text: + return filtered(lambda text: "face-sitting" in text or any(term in text for term in cunnilingus_terms)) + if "kneeling oral" in position_text: + return filtered(lambda text: any(term in text for term in penis_terms)) + if "straddled oral" in position_text or "reclining cunnilingus" in position_text: + return filtered(lambda text: "sixty-nine" not in text and not any(term in text for term in penis_terms)) + if "spread-leg oral" in position_text: + return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text) + if any(term in position_text for term in ("standing oral", "kneeling oral", "edge-of-bed oral", "chair oral", "side-lying oral")): + return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text) + return values + + +def oral_axis_values_for_context(values: list[Any], position: str, oral_act: str, axis_name: str) -> list[Any]: + axis_name = str(axis_name or "").lower() + if axis_name not in {"body_contact", "hand_detail", "mouth_detail", "saliva_detail", "climax_hint", "visibility"}: + return values + position_text = str(position or "").lower() + act_text = str(oral_act or "").lower() + woman_gives = any( + term in act_text + for term in ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth") + ) + man_gives = any( + term in act_text + for term in ("cunnilingus", "pussy licking", "tongue on pussy") + ) + if not (woman_gives or man_gives): + return values + + def value_text(value: Any) -> str: + return entry_text(value).lower() + + def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]: + matches = [ + value + for value in values + if any(term in value_text(value) for term in terms) + and not any(term in value_text(value) for term in excluded_terms) + ] + return matches or values + + if woman_gives: + by_axis = { + "body_contact": ("hips pushed", "fingers tangled", "bodies stacked", "hands on thighs"), + "hand_detail": ("hips", "penis", "head", "hair"), + "mouth_detail": ("lips", "mouth", "deep mouth", "saliva"), + "saliva_detail": ("saliva", "wet lips", "slick wet mouth", "drool", "mouth"), + "climax_hint": ("mouth", "lips", "tongue", "breasts", "belly", "sexual fluids"), + "visibility": ("mouth", "penis", "oral"), + } + excluded = { + "body_contact": ("legs held open", "spread legs", "ass lifted", "chest pressed to thighs"), + "hand_detail": ("spreading thighs", "sheets", "cupping breasts", "pressing into thighs", "holding the ass"), + } + return filtered(by_axis.get(axis_name, ("mouth", "penis")), excluded.get(axis_name, ())) + if man_gives and ("kneeling oral" in position_text or "standing oral" in position_text): + by_axis = { + "body_contact": ("legs held open", "one body kneeling", "chest pressed", "ass lifted", "hands on thighs"), + "hand_detail": ("thigh", "hips", "head", "ass"), + "mouth_detail": ("tongue", "wet lips", "deep mouth", "genitals"), + "saliva_detail": ("saliva", "wet lips", "tongue", "drool"), + "climax_hint": ("sexual fluids", "orgasmic tension"), + "visibility": ("mouth", "pussy", "oral", "genital"), + } + return filtered(by_axis.get(axis_name, ("mouth", "pussy", "tongue")), ("penis", "breasts")) + return values + + +def outercourse_acts_for_position(values: list[Any], position: str) -> list[Any]: + position_text = str(position or "").lower() + if not position_text: + return values + + def act_text(value: Any) -> str: + return entry_text(value).lower() + + def filtered(predicate: Callable[[str], bool]) -> list[Any]: + matches = [value for value in values if predicate(act_text(value))] + return matches or values + + if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")): + return filtered(lambda text: any(term in text for term in ("boobjob", "titjob", "breast sex", "breasts"))) + if any(term in position_text for term in ("testicle", "balls")): + return filtered(lambda text: any(term in text for term in ("testicle", "balls"))) + if "penis-licking" in position_text or "penis licking" in position_text: + return filtered(lambda text: "licking" in text or "tongue" in text) + if "handjob" in position_text or "hand job" in position_text: + return filtered(lambda text: any(term in text for term in ("handjob", "hand job", "hand wrapped", "two-handed"))) + if "footjob" in position_text: + return filtered(lambda text: any(term in text for term in ("footjob", "feet", "soles", "toes"))) + return values + + +def outercourse_axis_values_for_position(values: list[Any], position: str, axis_name: str) -> list[Any]: + position_text = str(position or "").lower() + if not position_text: + return values + axis_name = str(axis_name or "").lower() + if axis_name not in {"contact_detail", "hand_detail", "texture_detail", "visibility", "body_contact"}: + return values + + def value_text(value: Any) -> str: + return entry_text(value).lower() + + def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]: + matches = [ + value + for value in values + if any(term in value_text(value) for term in terms) + and not any(term in value_text(value) for term in excluded_terms) + ] + return matches or values + + if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")): + by_axis = { + "contact_detail": ("compressed", "glans", "breast", "breasts", "soft tissue", "skin visibly"), + "hand_detail": ("breast", "breasts", "fingers"), + "texture_detail": ("compression", "soft flesh", "skin", "flesh", "asymmetry"), + "visibility": ("breast", "breasts", "glans", "shaft"), + "body_contact": ("torso", "body angled", "shoulders", "hips"), + } + excluded_by_axis = { + "contact_detail": ("hand wrapped", "fingers and palm", "soles", "toes", "balls", "tongue"), + "hand_detail": ("base of the penis", "penis shaft", "balls", "thigh", "ankles", "stroking"), + "texture_detail": ("toes", "soles", "tongue"), + "visibility": ("balls", "soles", "toes", "hand"), + "body_contact": ("head tucked", "face directly", "base of the penis"), + } + return filtered( + by_axis.get(axis_name, ("breast", "breasts", "shaft")), + excluded_by_axis.get(axis_name, ()), + ) + if any(term in position_text for term in ("testicle", "balls")): + by_axis = { + "contact_detail": ("balls", "lips", "tongue", "wet"), + "hand_detail": ("balls", "base", "thigh"), + "texture_detail": ("wet", "saliva", "skin"), + "visibility": ("balls", "mouth"), + "body_contact": ("torso", "shoulders", "head tucked", "base of the penis", "knees", "thigh"), + } + return filtered(by_axis.get(axis_name, ("balls", "mouth", "tongue"))) + if "penis-licking" in position_text or "penis licking" in position_text: + by_axis = { + "contact_detail": ("tongue", "lips", "glans", "shaft", "wet"), + "hand_detail": ("base", "penis", "thigh"), + "texture_detail": ("wet", "saliva", "skin"), + "visibility": ("tongue", "penis"), + "body_contact": ("head low", "face directly", "torso", "pelvis", "base of the penis", "hips", "body angled"), + } + return filtered(by_axis.get(axis_name, ("tongue", "glans", "shaft"))) + if "handjob" in position_text or "hand job" in position_text: + by_axis = { + "contact_detail": ("hand", "fingers", "palm", "shaft", "glans"), + "hand_detail": ("hand", "hands", "shaft", "penis"), + "texture_detail": ("fingers", "pressure", "skin", "shaft"), + "visibility": ("hand", "penis", "shaft", "glans"), + "body_contact": ("hips", "knees", "body angle"), + } + return filtered(by_axis.get(axis_name, ("hand", "penis", "shaft"))) + if "footjob" in position_text: + by_axis = { + "contact_detail": ("soles", "toes"), + "hand_detail": ("ankles", "thighs"), + "texture_detail": ("toes", "soles", "pressure"), + "visibility": ("feet", "soles"), + "body_contact": ("legs", "knees", "body angled"), + } + excluded_by_axis = { + "contact_detail": ("hand", "finger", "palm", "balls", "tongue", "breast"), + "texture_detail": ("fingers", "tongue", "breast"), + "visibility": ("hand", "balls", "breast"), + } + return filtered( + by_axis.get(axis_name, ("feet", "soles", "toes")), + excluded_by_axis.get(axis_name, ()), + ) + return values + + +def _format(template: str, context: dict[str, Any]) -> str: + fields = {key for _, key, _, _ in Formatter().parse(template) if key} + safe_context = SafeFormatDict({key: "" for key in fields}) + safe_context.update(context) + return template.format_map(safe_context) + + +def compose_item( + rng: random.Random, + category: dict[str, Any], + subcategory: dict[str, Any], + item: Any, + women_count: int = 1, + men_count: int = 1, +) -> tuple[str, str, dict[str, str], dict[str, Any]]: + templates = category_policy.template_list(category, subcategory, item, "item_templates") + axes = category_policy.merged_axes(category, subcategory, item) + if templates and axes: + template_entry = weighted_choice(rng, category_policy.compatible_entries(templates, women_count, men_count)) + template = entry_text(template_entry) + fields = [key for _, key, _, _ in Formatter().parse(template) if key] + unique_fields = list(dict.fromkeys(fields)) + axis_values: dict[str, str] = {} + subcategory_slug = str(subcategory.get("slug") or "").lower() + if subcategory_slug in ("oral_sex", "outercourse_sex") and "position" in unique_fields and axes.get("position"): + position_values = category_policy.compatible_entries(axes["position"], women_count, men_count) + axis_values["position"] = entry_text(weighted_choice(rng, position_values)) + for name in unique_fields: + if name in axis_values or name not in axes or not axes[name]: + continue + values = category_policy.compatible_entries(axes[name], women_count, men_count) + if subcategory_slug == "oral_sex" and name == "oral_act": + values = oral_acts_for_position(values, axis_values.get("position", "")) + elif subcategory_slug == "oral_sex": + values = oral_axis_values_for_context( + values, + axis_values.get("position", ""), + axis_values.get("oral_act", ""), + name, + ) + if subcategory_slug == "outercourse_sex" and name == "outer_act": + values = outercourse_acts_for_position(values, axis_values.get("position", "")) + if subcategory_slug == "outercourse_sex": + values = outercourse_axis_values_for_position(values, axis_values.get("position", ""), name) + axis_values[name] = entry_text(weighted_choice(rng, values)) + item_prompt = _format(template, axis_values).strip() + name = item_name(item) or subcategory["name"] + return item_prompt, name, axis_values, template_policy.template_metadata(template_entry) + return item_text(item), item_name(item), {}, template_policy.template_metadata(item) diff --git a/tools/prompt_smoke.py b/tools/prompt_smoke.py index 25aaf28..eb07aa7 100644 --- a/tools/prompt_smoke.py +++ b/tools/prompt_smoke.py @@ -52,6 +52,7 @@ import row_normalization # noqa: E402 import route_metadata # noqa: E402 import row_camera # noqa: E402 import row_expression # noqa: E402 +import row_item # noqa: E402 import row_location # noqa: E402 import row_pools # noqa: E402 import server_routes # noqa: E402 @@ -732,6 +733,83 @@ def smoke_row_expression_policy() -> None: ) +def smoke_row_item_policy() -> None: + weighted_entries = [ + {"text": "first", "weight": 0.0}, + {"text": "second", "weight": 4.0}, + {"text": "third", "weight": 1.0}, + ] + _expect( + pb._weighted_choice(random.Random(44), weighted_entries) == row_item.weighted_choice(random.Random(44), weighted_entries), + "Prompt builder weighted item choice should delegate to row_item", + ) + _expect( + pb._pair_from({"name": "Library Corner", "description": "carved shelves and warm lamps"}) + == row_item.pair_from({"name": "Library Corner", "description": "carved shelves and warm lamps"}), + "Prompt builder pair parser should delegate to row_item", + ) + + oral_values = [ + "fellatio with penis in mouth", + "cunnilingus with tongue on pussy", + "sixty-nine mutual oral", + ] + _expect( + pb._oral_acts_for_position(oral_values, "kneeling oral position") + == row_item.oral_acts_for_position(oral_values, "kneeling oral position") + == ["fellatio with penis in mouth"], + "Row item oral act filtering changed for kneeling oral", + ) + outer_values = [ + "titjob with compressed breasts", + "handjob with palm around shaft", + "footjob with soles and toes", + ] + _expect( + pb._outercourse_acts_for_position(outer_values, "footjob position") + == row_item.outercourse_acts_for_position(outer_values, "footjob position") + == ["footjob with soles and toes"], + "Row item outercourse act filtering changed for footjob", + ) + texture_values = [ + "fingers pressing around shaft", + "soles pressing around shaft", + "tongue wet against shaft", + ] + _expect( + row_item.outercourse_axis_values_for_position(texture_values, "footjob position", "texture_detail") + == ["soles pressing around shaft"], + "Row item outercourse texture axis should prefer footjob-compatible details", + ) + + category = {} + subcategory = { + "name": "Oral sex", + "slug": "oral_sex", + "item_templates": [ + { + "template": "{oral_act}; {hand_detail}", + "action_family": "oral", + "position_family": "oral", + } + ], + "item_axes": { + "position": ["kneeling oral position"], + "oral_act": oral_values, + "hand_detail": ["hands on hips", "spreading thighs"], + }, + } + item = "Oral sex" + builder_result = pb._compose_item(random.Random(7), category, subcategory, item, women_count=1, men_count=1) + policy_result = row_item.compose_item(random.Random(7), category, subcategory, item, women_count=1, men_count=1) + _expect(builder_result == policy_result, "Prompt builder compose item should delegate to row_item") + item_text, item_name, axis_values, metadata = policy_result + _expect(item_name == "Oral sex", "Row item compose lost item name") + _expect("fellatio" in item_text, "Row item compose did not apply oral act compatibility filter") + _expect(axis_values.get("hand_detail") == "hands on hips", "Row item compose did not apply oral detail filter") + _expect(metadata.get("action_family") == "oral", "Row item compose lost template metadata") + + def smoke_category_cast_config_policy() -> None: _expect(pb.CATEGORY_PRESETS is category_cast_config.CATEGORY_PRESETS, "Prompt builder category presets are not delegated") _expect(pb.CAST_PRESETS is category_cast_config.CAST_PRESETS, "Prompt builder cast presets are not delegated") @@ -3931,6 +4009,7 @@ SMOKE_CASES: list[tuple[str, Callable[[], None]]] = [ ("location_config_policy", smoke_location_config_policy), ("row_location_policy", smoke_row_location_policy), ("row_expression_policy", smoke_row_expression_policy), + ("row_item_policy", smoke_row_item_policy), ("category_cast_config_policy", smoke_category_cast_config_policy), ("generation_profile_config_policy", smoke_generation_profile_config_policy), ("filter_config_policy", smoke_filter_config_policy),