from __future__ import annotations import random from string import Formatter from typing import Any, Callable try: from . import category_library as category_policy from . import category_template_metadata as template_policy from . import generate_prompt_batches as g except ImportError: # Allows local smoke tests with top-level imports. import category_library as category_policy import category_template_metadata as template_policy import generate_prompt_batches as g class SafeFormatDict(dict): def __missing__(self, key: str) -> str: return "{" + key + "}" def slug(value: str) -> str: return g.slugify(value) or "custom" def pair_from(value: Any) -> tuple[str, str]: if isinstance(value, dict): text = str( value.get("prompt") or value.get("description") or value.get("text") or value.get("name") or "" ).strip() pair_slug = str(value.get("slug") or slug(str(value.get("name") or text))).strip() if not text: raise ValueError(f"Pair extension is missing prompt text: {value!r}") return pair_slug, text if isinstance(value, (list, tuple)) and len(value) == 2: return str(value[0]), str(value[1]) text = str(value).strip() if not text: raise ValueError("Pair extension cannot be empty") return slug(text), text def weighted_choice(rng: random.Random, items: list[Any]) -> Any: if not items: raise ValueError("Cannot choose from an empty list") weights: list[float] = [] for item in items: weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0 try: weights.append(max(0.0, float(weight))) except (TypeError, ValueError): weights.append(1.0) total = sum(weights) if total <= 0: return items[rng.randrange(len(items))] pick = rng.random() * total running = 0.0 for item, weight in zip(items, weights): running += weight if pick <= running: return item return items[-1] def entry_text(item: Any) -> str: return category_policy._entry_text(item) def item_text(item: Any) -> str: return entry_text(item) def item_name(item: Any) -> str: if isinstance(item, dict): return str(item.get("name") or item_text(item)).strip() return item_text(item) def choose_text(rng: random.Random, items: list[Any]) -> str: return item_text(weighted_choice(rng, items)) def choose_distinct_text(rng: random.Random, items: list[Any], first_text: str) -> str: first_text = item_text(first_text).lower() distinct = [item for item in items if item_text(item).lower() != first_text] if not distinct: return "" return choose_text(rng, distinct) def choose_pair(rng: random.Random, items: list[Any]) -> tuple[str, str]: return pair_from(weighted_choice(rng, items)) def oral_acts_for_position(values: list[Any], position: str) -> list[Any]: position_text = str(position or "").lower() if not position_text: return values def act_text(value: Any) -> str: return entry_text(value).lower() def filtered(predicate: Callable[[str], bool]) -> list[Any]: matches = [value for value in values if predicate(act_text(value))] return matches or values penis_terms = ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth") cunnilingus_terms = ("cunnilingus", "pussy licking", "tongue on pussy", "oral sex with tongue and fingers", "mouth on genitals") if "sixty-nine" in position_text: return filtered(lambda text: "sixty-nine" in text) if "face-sitting" in position_text: return filtered(lambda text: "face-sitting" in text or any(term in text for term in cunnilingus_terms)) if "kneeling oral" in position_text: return filtered(lambda text: any(term in text for term in penis_terms)) if "straddled oral" in position_text or "reclining cunnilingus" in position_text: return filtered(lambda text: "sixty-nine" not in text and not any(term in text for term in penis_terms)) if "spread-leg oral" in position_text: return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text) if any(term in position_text for term in ("standing oral", "kneeling oral", "edge-of-bed oral", "chair oral", "side-lying oral")): return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text) return values def oral_axis_values_for_context(values: list[Any], position: str, oral_act: str, axis_name: str) -> list[Any]: axis_name = str(axis_name or "").lower() if axis_name not in {"body_contact", "hand_detail", "mouth_detail", "saliva_detail", "climax_hint", "visibility"}: return values position_text = str(position or "").lower() act_text = str(oral_act or "").lower() woman_gives = any( term in act_text for term in ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth") ) man_gives = any( term in act_text for term in ("cunnilingus", "pussy licking", "tongue on pussy") ) if not (woman_gives or man_gives): return values def value_text(value: Any) -> str: return entry_text(value).lower() def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]: matches = [ value for value in values if any(term in value_text(value) for term in terms) and not any(term in value_text(value) for term in excluded_terms) ] return matches or values if woman_gives: by_axis = { "body_contact": ("hips pushed", "fingers tangled", "bodies stacked", "hands on thighs"), "hand_detail": ("hips", "penis", "head", "hair"), "mouth_detail": ("lips", "mouth", "deep mouth", "saliva"), "saliva_detail": ("saliva", "wet lips", "slick wet mouth", "drool", "mouth"), "climax_hint": ("mouth", "lips", "tongue", "breasts", "belly", "sexual fluids"), "visibility": ("mouth", "penis", "oral"), } excluded = { "body_contact": ("legs held open", "spread legs", "ass lifted", "chest pressed to thighs"), "hand_detail": ("spreading thighs", "sheets", "cupping breasts", "pressing into thighs", "holding the ass"), } return filtered(by_axis.get(axis_name, ("mouth", "penis")), excluded.get(axis_name, ())) if man_gives and ("kneeling oral" in position_text or "standing oral" in position_text): by_axis = { "body_contact": ("legs held open", "one body kneeling", "chest pressed", "ass lifted", "hands on thighs"), "hand_detail": ("thigh", "hips", "head", "ass"), "mouth_detail": ("tongue", "wet lips", "deep mouth", "genitals"), "saliva_detail": ("saliva", "wet lips", "tongue", "drool"), "climax_hint": ("sexual fluids", "orgasmic tension"), "visibility": ("mouth", "pussy", "oral", "genital"), } return filtered(by_axis.get(axis_name, ("mouth", "pussy", "tongue")), ("penis", "breasts")) return values def outercourse_acts_for_position(values: list[Any], position: str) -> list[Any]: position_text = str(position or "").lower() if not position_text: return values def act_text(value: Any) -> str: return entry_text(value).lower() def filtered(predicate: Callable[[str], bool]) -> list[Any]: matches = [value for value in values if predicate(act_text(value))] return matches or values if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")): return filtered(lambda text: any(term in text for term in ("boobjob", "titjob", "breast sex", "breasts"))) if any(term in position_text for term in ("testicle", "balls")): return filtered(lambda text: any(term in text for term in ("testicle", "balls"))) if "penis-licking" in position_text or "penis licking" in position_text: return filtered(lambda text: "licking" in text or "tongue" in text) if "handjob" in position_text or "hand job" in position_text: return filtered(lambda text: any(term in text for term in ("handjob", "hand job", "hand wrapped", "two-handed"))) if "footjob" in position_text: return filtered(lambda text: any(term in text for term in ("footjob", "feet", "soles", "toes"))) return values def outercourse_axis_values_for_position(values: list[Any], position: str, axis_name: str) -> list[Any]: position_text = str(position or "").lower() if not position_text: return values axis_name = str(axis_name or "").lower() if axis_name not in {"contact_detail", "hand_detail", "texture_detail", "visibility", "body_contact"}: return values def value_text(value: Any) -> str: return entry_text(value).lower() def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]: matches = [ value for value in values if any(term in value_text(value) for term in terms) and not any(term in value_text(value) for term in excluded_terms) ] return matches or values if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")): by_axis = { "contact_detail": ("compressed", "glans", "breast", "breasts", "soft tissue", "skin visibly"), "hand_detail": ("breast", "breasts", "fingers"), "texture_detail": ("compression", "soft flesh", "skin", "flesh", "asymmetry"), "visibility": ("breast", "breasts", "glans", "shaft"), "body_contact": ("torso", "body angled", "shoulders", "hips"), } excluded_by_axis = { "contact_detail": ("hand wrapped", "fingers and palm", "soles", "toes", "balls", "tongue"), "hand_detail": ("base of the penis", "penis shaft", "balls", "thigh", "ankles", "stroking"), "texture_detail": ("toes", "soles", "tongue"), "visibility": ("balls", "soles", "toes", "hand"), "body_contact": ("head tucked", "face directly", "base of the penis"), } return filtered( by_axis.get(axis_name, ("breast", "breasts", "shaft")), excluded_by_axis.get(axis_name, ()), ) if any(term in position_text for term in ("testicle", "balls")): by_axis = { "contact_detail": ("balls", "lips", "tongue", "wet"), "hand_detail": ("balls", "base", "thigh"), "texture_detail": ("wet", "saliva", "skin"), "visibility": ("balls", "mouth"), "body_contact": ("torso", "shoulders", "head tucked", "base of the penis", "knees", "thigh"), } return filtered(by_axis.get(axis_name, ("balls", "mouth", "tongue"))) if "penis-licking" in position_text or "penis licking" in position_text: by_axis = { "contact_detail": ("tongue", "lips", "glans", "shaft", "wet"), "hand_detail": ("base", "penis", "thigh"), "texture_detail": ("wet", "saliva", "skin"), "visibility": ("tongue", "penis"), "body_contact": ("head low", "face directly", "torso", "pelvis", "base of the penis", "hips", "body angled"), } return filtered(by_axis.get(axis_name, ("tongue", "glans", "shaft"))) if "handjob" in position_text or "hand job" in position_text: by_axis = { "contact_detail": ("hand", "fingers", "palm", "shaft", "glans"), "hand_detail": ("hand", "hands", "shaft", "penis"), "texture_detail": ("fingers", "pressure", "skin", "shaft"), "visibility": ("hand", "penis", "shaft", "glans"), "body_contact": ("hips", "knees", "body angle"), } return filtered(by_axis.get(axis_name, ("hand", "penis", "shaft"))) if "footjob" in position_text: by_axis = { "contact_detail": ("soles", "toes"), "hand_detail": ("ankles", "thighs"), "texture_detail": ("toes", "soles", "pressure"), "visibility": ("feet", "soles"), "body_contact": ("legs", "knees", "body angled"), } excluded_by_axis = { "contact_detail": ("hand", "finger", "palm", "balls", "tongue", "breast"), "texture_detail": ("fingers", "tongue", "breast"), "visibility": ("hand", "balls", "breast"), } return filtered( by_axis.get(axis_name, ("feet", "soles", "toes")), excluded_by_axis.get(axis_name, ()), ) return values def _format(template: str, context: dict[str, Any]) -> str: fields = {key for _, key, _, _ in Formatter().parse(template) if key} safe_context = SafeFormatDict({key: "" for key in fields}) safe_context.update(context) return template.format_map(safe_context) def compose_item( rng: random.Random, category: dict[str, Any], subcategory: dict[str, Any], item: Any, women_count: int = 1, men_count: int = 1, ) -> tuple[str, str, dict[str, str], dict[str, Any]]: templates = category_policy.template_list(category, subcategory, item, "item_templates") axes = category_policy.merged_axes(category, subcategory, item) inherited_metadata = template_policy.inherited_template_metadata(category, subcategory, item) if templates and axes: template_entry = weighted_choice(rng, category_policy.compatible_entries(templates, women_count, men_count)) template = entry_text(template_entry) fields = [key for _, key, _, _ in Formatter().parse(template) if key] unique_fields = list(dict.fromkeys(fields)) axis_values: dict[str, str] = {} subcategory_slug = str(subcategory.get("slug") or "").lower() if subcategory_slug in ("oral_sex", "outercourse_sex") and "position" in unique_fields and axes.get("position"): position_values = category_policy.compatible_entries(axes["position"], women_count, men_count) axis_values["position"] = entry_text(weighted_choice(rng, position_values)) for name in unique_fields: if name in axis_values or name not in axes or not axes[name]: continue values = category_policy.compatible_entries(axes[name], women_count, men_count) if subcategory_slug == "oral_sex" and name == "oral_act": values = oral_acts_for_position(values, axis_values.get("position", "")) elif subcategory_slug == "oral_sex": values = oral_axis_values_for_context( values, axis_values.get("position", ""), axis_values.get("oral_act", ""), name, ) if subcategory_slug == "outercourse_sex" and name == "outer_act": values = outercourse_acts_for_position(values, axis_values.get("position", "")) if subcategory_slug == "outercourse_sex": values = outercourse_axis_values_for_position(values, axis_values.get("position", ""), name) axis_values[name] = entry_text(weighted_choice(rng, values)) item_prompt = _format(template, axis_values).strip() name = item_name(item) or subcategory["name"] return ( item_prompt, name, axis_values, template_policy.merge_template_metadata(inherited_metadata, template_policy.template_metadata(template_entry)), ) return item_text(item), item_name(item), {}, template_policy.merge_template_metadata( inherited_metadata, template_policy.template_metadata(item), )