Extract row item policy
This commit is contained in:
+343
@@ -0,0 +1,343 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
from string import Formatter
|
||||
from typing import Any, Callable
|
||||
|
||||
try:
|
||||
from . import category_library as category_policy
|
||||
from . import category_template_metadata as template_policy
|
||||
from . import generate_prompt_batches as g
|
||||
except ImportError: # Allows local smoke tests with top-level imports.
|
||||
import category_library as category_policy
|
||||
import category_template_metadata as template_policy
|
||||
import generate_prompt_batches as g
|
||||
|
||||
|
||||
class SafeFormatDict(dict):
|
||||
def __missing__(self, key: str) -> str:
|
||||
return "{" + key + "}"
|
||||
|
||||
|
||||
def slug(value: str) -> str:
|
||||
return g.slugify(value) or "custom"
|
||||
|
||||
|
||||
def pair_from(value: Any) -> tuple[str, str]:
|
||||
if isinstance(value, dict):
|
||||
text = str(
|
||||
value.get("prompt")
|
||||
or value.get("description")
|
||||
or value.get("text")
|
||||
or value.get("name")
|
||||
or ""
|
||||
).strip()
|
||||
pair_slug = str(value.get("slug") or slug(str(value.get("name") or text))).strip()
|
||||
if not text:
|
||||
raise ValueError(f"Pair extension is missing prompt text: {value!r}")
|
||||
return pair_slug, text
|
||||
if isinstance(value, (list, tuple)) and len(value) == 2:
|
||||
return str(value[0]), str(value[1])
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
raise ValueError("Pair extension cannot be empty")
|
||||
return slug(text), text
|
||||
|
||||
|
||||
def weighted_choice(rng: random.Random, items: list[Any]) -> Any:
|
||||
if not items:
|
||||
raise ValueError("Cannot choose from an empty list")
|
||||
weights: list[float] = []
|
||||
for item in items:
|
||||
weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0
|
||||
try:
|
||||
weights.append(max(0.0, float(weight)))
|
||||
except (TypeError, ValueError):
|
||||
weights.append(1.0)
|
||||
total = sum(weights)
|
||||
if total <= 0:
|
||||
return items[rng.randrange(len(items))]
|
||||
pick = rng.random() * total
|
||||
running = 0.0
|
||||
for item, weight in zip(items, weights):
|
||||
running += weight
|
||||
if pick <= running:
|
||||
return item
|
||||
return items[-1]
|
||||
|
||||
|
||||
def entry_text(item: Any) -> str:
|
||||
return category_policy._entry_text(item)
|
||||
|
||||
|
||||
def item_text(item: Any) -> str:
|
||||
return entry_text(item)
|
||||
|
||||
|
||||
def item_name(item: Any) -> str:
|
||||
if isinstance(item, dict):
|
||||
return str(item.get("name") or item_text(item)).strip()
|
||||
return item_text(item)
|
||||
|
||||
|
||||
def choose_text(rng: random.Random, items: list[Any]) -> str:
|
||||
return item_text(weighted_choice(rng, items))
|
||||
|
||||
|
||||
def choose_distinct_text(rng: random.Random, items: list[Any], first_text: str) -> str:
|
||||
first_text = item_text(first_text).lower()
|
||||
distinct = [item for item in items if item_text(item).lower() != first_text]
|
||||
if not distinct:
|
||||
return ""
|
||||
return choose_text(rng, distinct)
|
||||
|
||||
|
||||
def choose_pair(rng: random.Random, items: list[Any]) -> tuple[str, str]:
|
||||
return pair_from(weighted_choice(rng, items))
|
||||
|
||||
|
||||
def oral_acts_for_position(values: list[Any], position: str) -> list[Any]:
|
||||
position_text = str(position or "").lower()
|
||||
if not position_text:
|
||||
return values
|
||||
|
||||
def act_text(value: Any) -> str:
|
||||
return entry_text(value).lower()
|
||||
|
||||
def filtered(predicate: Callable[[str], bool]) -> list[Any]:
|
||||
matches = [value for value in values if predicate(act_text(value))]
|
||||
return matches or values
|
||||
|
||||
penis_terms = ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth")
|
||||
cunnilingus_terms = ("cunnilingus", "pussy licking", "tongue on pussy", "oral sex with tongue and fingers", "mouth on genitals")
|
||||
if "sixty-nine" in position_text:
|
||||
return filtered(lambda text: "sixty-nine" in text)
|
||||
if "face-sitting" in position_text:
|
||||
return filtered(lambda text: "face-sitting" in text or any(term in text for term in cunnilingus_terms))
|
||||
if "kneeling oral" in position_text:
|
||||
return filtered(lambda text: any(term in text for term in penis_terms))
|
||||
if "straddled oral" in position_text or "reclining cunnilingus" in position_text:
|
||||
return filtered(lambda text: "sixty-nine" not in text and not any(term in text for term in penis_terms))
|
||||
if "spread-leg oral" in position_text:
|
||||
return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text)
|
||||
if any(term in position_text for term in ("standing oral", "kneeling oral", "edge-of-bed oral", "chair oral", "side-lying oral")):
|
||||
return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text)
|
||||
return values
|
||||
|
||||
|
||||
def oral_axis_values_for_context(values: list[Any], position: str, oral_act: str, axis_name: str) -> list[Any]:
|
||||
axis_name = str(axis_name or "").lower()
|
||||
if axis_name not in {"body_contact", "hand_detail", "mouth_detail", "saliva_detail", "climax_hint", "visibility"}:
|
||||
return values
|
||||
position_text = str(position or "").lower()
|
||||
act_text = str(oral_act or "").lower()
|
||||
woman_gives = any(
|
||||
term in act_text
|
||||
for term in ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth")
|
||||
)
|
||||
man_gives = any(
|
||||
term in act_text
|
||||
for term in ("cunnilingus", "pussy licking", "tongue on pussy")
|
||||
)
|
||||
if not (woman_gives or man_gives):
|
||||
return values
|
||||
|
||||
def value_text(value: Any) -> str:
|
||||
return entry_text(value).lower()
|
||||
|
||||
def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]:
|
||||
matches = [
|
||||
value
|
||||
for value in values
|
||||
if any(term in value_text(value) for term in terms)
|
||||
and not any(term in value_text(value) for term in excluded_terms)
|
||||
]
|
||||
return matches or values
|
||||
|
||||
if woman_gives:
|
||||
by_axis = {
|
||||
"body_contact": ("hips pushed", "fingers tangled", "bodies stacked", "hands on thighs"),
|
||||
"hand_detail": ("hips", "penis", "head", "hair"),
|
||||
"mouth_detail": ("lips", "mouth", "deep mouth", "saliva"),
|
||||
"saliva_detail": ("saliva", "wet lips", "slick wet mouth", "drool", "mouth"),
|
||||
"climax_hint": ("mouth", "lips", "tongue", "breasts", "belly", "sexual fluids"),
|
||||
"visibility": ("mouth", "penis", "oral"),
|
||||
}
|
||||
excluded = {
|
||||
"body_contact": ("legs held open", "spread legs", "ass lifted", "chest pressed to thighs"),
|
||||
"hand_detail": ("spreading thighs", "sheets", "cupping breasts", "pressing into thighs", "holding the ass"),
|
||||
}
|
||||
return filtered(by_axis.get(axis_name, ("mouth", "penis")), excluded.get(axis_name, ()))
|
||||
if man_gives and ("kneeling oral" in position_text or "standing oral" in position_text):
|
||||
by_axis = {
|
||||
"body_contact": ("legs held open", "one body kneeling", "chest pressed", "ass lifted", "hands on thighs"),
|
||||
"hand_detail": ("thigh", "hips", "head", "ass"),
|
||||
"mouth_detail": ("tongue", "wet lips", "deep mouth", "genitals"),
|
||||
"saliva_detail": ("saliva", "wet lips", "tongue", "drool"),
|
||||
"climax_hint": ("sexual fluids", "orgasmic tension"),
|
||||
"visibility": ("mouth", "pussy", "oral", "genital"),
|
||||
}
|
||||
return filtered(by_axis.get(axis_name, ("mouth", "pussy", "tongue")), ("penis", "breasts"))
|
||||
return values
|
||||
|
||||
|
||||
def outercourse_acts_for_position(values: list[Any], position: str) -> list[Any]:
|
||||
position_text = str(position or "").lower()
|
||||
if not position_text:
|
||||
return values
|
||||
|
||||
def act_text(value: Any) -> str:
|
||||
return entry_text(value).lower()
|
||||
|
||||
def filtered(predicate: Callable[[str], bool]) -> list[Any]:
|
||||
matches = [value for value in values if predicate(act_text(value))]
|
||||
return matches or values
|
||||
|
||||
if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")):
|
||||
return filtered(lambda text: any(term in text for term in ("boobjob", "titjob", "breast sex", "breasts")))
|
||||
if any(term in position_text for term in ("testicle", "balls")):
|
||||
return filtered(lambda text: any(term in text for term in ("testicle", "balls")))
|
||||
if "penis-licking" in position_text or "penis licking" in position_text:
|
||||
return filtered(lambda text: "licking" in text or "tongue" in text)
|
||||
if "handjob" in position_text or "hand job" in position_text:
|
||||
return filtered(lambda text: any(term in text for term in ("handjob", "hand job", "hand wrapped", "two-handed")))
|
||||
if "footjob" in position_text:
|
||||
return filtered(lambda text: any(term in text for term in ("footjob", "feet", "soles", "toes")))
|
||||
return values
|
||||
|
||||
|
||||
def outercourse_axis_values_for_position(values: list[Any], position: str, axis_name: str) -> list[Any]:
|
||||
position_text = str(position or "").lower()
|
||||
if not position_text:
|
||||
return values
|
||||
axis_name = str(axis_name or "").lower()
|
||||
if axis_name not in {"contact_detail", "hand_detail", "texture_detail", "visibility", "body_contact"}:
|
||||
return values
|
||||
|
||||
def value_text(value: Any) -> str:
|
||||
return entry_text(value).lower()
|
||||
|
||||
def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]:
|
||||
matches = [
|
||||
value
|
||||
for value in values
|
||||
if any(term in value_text(value) for term in terms)
|
||||
and not any(term in value_text(value) for term in excluded_terms)
|
||||
]
|
||||
return matches or values
|
||||
|
||||
if any(term in position_text for term in ("boobjob", "titjob", "breast-sex", "breast sex")):
|
||||
by_axis = {
|
||||
"contact_detail": ("compressed", "glans", "breast", "breasts", "soft tissue", "skin visibly"),
|
||||
"hand_detail": ("breast", "breasts", "fingers"),
|
||||
"texture_detail": ("compression", "soft flesh", "skin", "flesh", "asymmetry"),
|
||||
"visibility": ("breast", "breasts", "glans", "shaft"),
|
||||
"body_contact": ("torso", "body angled", "shoulders", "hips"),
|
||||
}
|
||||
excluded_by_axis = {
|
||||
"contact_detail": ("hand wrapped", "fingers and palm", "soles", "toes", "balls", "tongue"),
|
||||
"hand_detail": ("base of the penis", "penis shaft", "balls", "thigh", "ankles", "stroking"),
|
||||
"texture_detail": ("toes", "soles", "tongue"),
|
||||
"visibility": ("balls", "soles", "toes", "hand"),
|
||||
"body_contact": ("head tucked", "face directly", "base of the penis"),
|
||||
}
|
||||
return filtered(
|
||||
by_axis.get(axis_name, ("breast", "breasts", "shaft")),
|
||||
excluded_by_axis.get(axis_name, ()),
|
||||
)
|
||||
if any(term in position_text for term in ("testicle", "balls")):
|
||||
by_axis = {
|
||||
"contact_detail": ("balls", "lips", "tongue", "wet"),
|
||||
"hand_detail": ("balls", "base", "thigh"),
|
||||
"texture_detail": ("wet", "saliva", "skin"),
|
||||
"visibility": ("balls", "mouth"),
|
||||
"body_contact": ("torso", "shoulders", "head tucked", "base of the penis", "knees", "thigh"),
|
||||
}
|
||||
return filtered(by_axis.get(axis_name, ("balls", "mouth", "tongue")))
|
||||
if "penis-licking" in position_text or "penis licking" in position_text:
|
||||
by_axis = {
|
||||
"contact_detail": ("tongue", "lips", "glans", "shaft", "wet"),
|
||||
"hand_detail": ("base", "penis", "thigh"),
|
||||
"texture_detail": ("wet", "saliva", "skin"),
|
||||
"visibility": ("tongue", "penis"),
|
||||
"body_contact": ("head low", "face directly", "torso", "pelvis", "base of the penis", "hips", "body angled"),
|
||||
}
|
||||
return filtered(by_axis.get(axis_name, ("tongue", "glans", "shaft")))
|
||||
if "handjob" in position_text or "hand job" in position_text:
|
||||
by_axis = {
|
||||
"contact_detail": ("hand", "fingers", "palm", "shaft", "glans"),
|
||||
"hand_detail": ("hand", "hands", "shaft", "penis"),
|
||||
"texture_detail": ("fingers", "pressure", "skin", "shaft"),
|
||||
"visibility": ("hand", "penis", "shaft", "glans"),
|
||||
"body_contact": ("hips", "knees", "body angle"),
|
||||
}
|
||||
return filtered(by_axis.get(axis_name, ("hand", "penis", "shaft")))
|
||||
if "footjob" in position_text:
|
||||
by_axis = {
|
||||
"contact_detail": ("soles", "toes"),
|
||||
"hand_detail": ("ankles", "thighs"),
|
||||
"texture_detail": ("toes", "soles", "pressure"),
|
||||
"visibility": ("feet", "soles"),
|
||||
"body_contact": ("legs", "knees", "body angled"),
|
||||
}
|
||||
excluded_by_axis = {
|
||||
"contact_detail": ("hand", "finger", "palm", "balls", "tongue", "breast"),
|
||||
"texture_detail": ("fingers", "tongue", "breast"),
|
||||
"visibility": ("hand", "balls", "breast"),
|
||||
}
|
||||
return filtered(
|
||||
by_axis.get(axis_name, ("feet", "soles", "toes")),
|
||||
excluded_by_axis.get(axis_name, ()),
|
||||
)
|
||||
return values
|
||||
|
||||
|
||||
def _format(template: str, context: dict[str, Any]) -> str:
|
||||
fields = {key for _, key, _, _ in Formatter().parse(template) if key}
|
||||
safe_context = SafeFormatDict({key: "" for key in fields})
|
||||
safe_context.update(context)
|
||||
return template.format_map(safe_context)
|
||||
|
||||
|
||||
def compose_item(
|
||||
rng: random.Random,
|
||||
category: dict[str, Any],
|
||||
subcategory: dict[str, Any],
|
||||
item: Any,
|
||||
women_count: int = 1,
|
||||
men_count: int = 1,
|
||||
) -> tuple[str, str, dict[str, str], dict[str, Any]]:
|
||||
templates = category_policy.template_list(category, subcategory, item, "item_templates")
|
||||
axes = category_policy.merged_axes(category, subcategory, item)
|
||||
if templates and axes:
|
||||
template_entry = weighted_choice(rng, category_policy.compatible_entries(templates, women_count, men_count))
|
||||
template = entry_text(template_entry)
|
||||
fields = [key for _, key, _, _ in Formatter().parse(template) if key]
|
||||
unique_fields = list(dict.fromkeys(fields))
|
||||
axis_values: dict[str, str] = {}
|
||||
subcategory_slug = str(subcategory.get("slug") or "").lower()
|
||||
if subcategory_slug in ("oral_sex", "outercourse_sex") and "position" in unique_fields and axes.get("position"):
|
||||
position_values = category_policy.compatible_entries(axes["position"], women_count, men_count)
|
||||
axis_values["position"] = entry_text(weighted_choice(rng, position_values))
|
||||
for name in unique_fields:
|
||||
if name in axis_values or name not in axes or not axes[name]:
|
||||
continue
|
||||
values = category_policy.compatible_entries(axes[name], women_count, men_count)
|
||||
if subcategory_slug == "oral_sex" and name == "oral_act":
|
||||
values = oral_acts_for_position(values, axis_values.get("position", ""))
|
||||
elif subcategory_slug == "oral_sex":
|
||||
values = oral_axis_values_for_context(
|
||||
values,
|
||||
axis_values.get("position", ""),
|
||||
axis_values.get("oral_act", ""),
|
||||
name,
|
||||
)
|
||||
if subcategory_slug == "outercourse_sex" and name == "outer_act":
|
||||
values = outercourse_acts_for_position(values, axis_values.get("position", ""))
|
||||
if subcategory_slug == "outercourse_sex":
|
||||
values = outercourse_axis_values_for_position(values, axis_values.get("position", ""), name)
|
||||
axis_values[name] = entry_text(weighted_choice(rng, values))
|
||||
item_prompt = _format(template, axis_values).strip()
|
||||
name = item_name(item) or subcategory["name"]
|
||||
return item_prompt, name, axis_values, template_policy.template_metadata(template_entry)
|
||||
return item_text(item), item_name(item), {}, template_policy.template_metadata(item)
|
||||
Reference in New Issue
Block a user