Files
ComfyUI-Ethanfel-Prompt-Bui…/row_item.py

466 lines
21 KiB
Python

from __future__ import annotations
import random
from string import Formatter
from typing import Any, Callable
try:
from . import category_library as category_policy
from . import category_template_metadata as template_policy
from . import generate_prompt_batches as g
from . import outercourse_action_policy as outercourse_policy
except ImportError: # Allows local smoke tests with top-level imports.
import category_library as category_policy
import category_template_metadata as template_policy
import generate_prompt_batches as g
import outercourse_action_policy as outercourse_policy
class SafeFormatDict(dict):
def __missing__(self, key: str) -> str:
return "{" + key + "}"
def slug(value: str) -> str:
return g.slugify(value) or "custom"
def pair_from(value: Any) -> tuple[str, str]:
if isinstance(value, dict):
text = str(
value.get("prompt")
or value.get("description")
or value.get("text")
or value.get("name")
or ""
).strip()
pair_slug = str(value.get("slug") or slug(str(value.get("name") or text))).strip()
if not text:
raise ValueError(f"Pair extension is missing prompt text: {value!r}")
return pair_slug, text
if isinstance(value, (list, tuple)) and len(value) == 2:
return str(value[0]), str(value[1])
text = str(value).strip()
if not text:
raise ValueError("Pair extension cannot be empty")
return slug(text), text
def weighted_choice(rng: random.Random, items: list[Any]) -> Any:
if not items:
raise ValueError("Cannot choose from an empty list")
weights: list[float] = []
for item in items:
weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0
try:
weights.append(max(0.0, float(weight)))
except (TypeError, ValueError):
weights.append(1.0)
total = sum(weights)
if total <= 0:
return items[rng.randrange(len(items))]
pick = rng.random() * total
running = 0.0
for item, weight in zip(items, weights):
running += weight
if pick <= running:
return item
return items[-1]
def entry_text(item: Any) -> str:
return category_policy._entry_text(item)
def item_text(item: Any) -> str:
return entry_text(item)
def item_name(item: Any) -> str:
if isinstance(item, dict):
return str(item.get("name") or item_text(item)).strip()
return item_text(item)
def choose_text(rng: random.Random, items: list[Any]) -> str:
return item_text(weighted_choice(rng, items))
def choose_distinct_text(rng: random.Random, items: list[Any], first_text: str) -> str:
first_text = item_text(first_text).lower()
distinct = [item for item in items if item_text(item).lower() != first_text]
if not distinct:
return ""
return choose_text(rng, distinct)
def choose_pair(rng: random.Random, items: list[Any]) -> tuple[str, str]:
return pair_from(weighted_choice(rng, items))
def oral_acts_for_position(values: list[Any], position: str) -> list[Any]:
position_text = str(position or "").lower()
if not position_text:
return values
def act_text(value: Any) -> str:
return entry_text(value).lower()
def filtered(predicate: Callable[[str], bool]) -> list[Any]:
matches = [value for value in values if predicate(act_text(value))]
return matches or values
penis_terms = ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth")
cunnilingus_terms = ("cunnilingus", "pussy licking", "tongue on pussy", "oral sex with tongue and fingers", "mouth on genitals")
if "sixty-nine" in position_text:
return filtered(lambda text: "sixty-nine" in text)
if "face-sitting" in position_text:
return filtered(lambda text: "face-sitting" in text or any(term in text for term in cunnilingus_terms))
if "kneeling oral" in position_text:
return filtered(lambda text: any(term in text for term in penis_terms))
if "straddled oral" in position_text or "reclining cunnilingus" in position_text:
return filtered(lambda text: "sixty-nine" not in text and not any(term in text for term in penis_terms))
if "spread-leg oral" in position_text:
return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text)
if any(term in position_text for term in ("standing oral", "kneeling oral", "edge-of-bed oral", "chair oral", "side-lying oral")):
return filtered(lambda text: "sixty-nine" not in text and "face-sitting" not in text)
return values
def oral_axis_values_for_context(values: list[Any], position: str, oral_act: str, axis_name: str) -> list[Any]:
axis_name = str(axis_name or "").lower()
if axis_name not in {"body_contact", "hand_detail", "mouth_detail", "saliva_detail", "climax_hint", "visibility"}:
return values
position_text = str(position or "").lower()
act_text = str(oral_act or "").lower()
woman_gives = any(
term in act_text
for term in ("fellatio", "blowjob", "deepthroat", "penis sucking", "penis in mouth")
)
man_gives = any(
term in act_text
for term in ("cunnilingus", "pussy licking", "tongue on pussy")
)
if not (woman_gives or man_gives):
return values
def value_text(value: Any) -> str:
return entry_text(value).lower()
def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]:
matches = [
value
for value in values
if any(term in value_text(value) for term in terms)
and not any(term in value_text(value) for term in excluded_terms)
]
return matches or values
if woman_gives:
by_axis = {
"body_contact": ("hips pushed", "fingers tangled", "bodies stacked", "hands on thighs"),
"hand_detail": ("hips", "penis", "head", "hair"),
"mouth_detail": ("lips", "mouth", "deep mouth", "saliva"),
"saliva_detail": ("saliva", "wet lips", "slick wet mouth", "drool", "mouth"),
"climax_hint": ("mouth", "lips", "tongue", "breasts", "belly", "sexual fluids"),
"visibility": ("mouth", "penis", "oral"),
}
excluded = {
"body_contact": ("legs held open", "spread legs", "ass lifted", "chest pressed to thighs"),
"hand_detail": ("spreading thighs", "sheets", "cupping breasts", "pressing into thighs", "holding the ass"),
}
return filtered(by_axis.get(axis_name, ("mouth", "penis")), excluded.get(axis_name, ()))
if man_gives and ("kneeling oral" in position_text or "standing oral" in position_text):
by_axis = {
"body_contact": ("legs held open", "one body kneeling", "chest pressed", "ass lifted", "hands on thighs"),
"hand_detail": ("thigh", "hips", "head", "ass"),
"mouth_detail": ("tongue", "wet lips", "deep mouth", "genitals"),
"saliva_detail": ("saliva", "wet lips", "tongue", "drool"),
"climax_hint": ("sexual fluids", "orgasmic tension"),
"visibility": ("mouth", "pussy", "oral", "genital"),
}
return filtered(by_axis.get(axis_name, ("mouth", "pussy", "tongue")), ("penis", "breasts"))
return values
def outercourse_acts_for_position(values: list[Any], position: str) -> list[Any]:
action_kind = outercourse_policy.infer_outercourse_action_kind(position)
if action_kind == outercourse_policy.OUTERCOURSE_GENERIC:
return values
def act_text(value: Any) -> str:
return entry_text(value).lower()
def filtered(predicate: Callable[[str], bool]) -> list[Any]:
matches = [value for value in values if predicate(act_text(value))]
return matches or values
if action_kind == outercourse_policy.OUTERCOURSE_BOOBJOB:
return filtered(lambda text: any(term in text for term in ("boobjob", "titjob", "breast sex", "breasts")))
if action_kind == outercourse_policy.OUTERCOURSE_TESTICLE:
return filtered(lambda text: any(term in text for term in ("testicle", "balls")))
if action_kind == outercourse_policy.OUTERCOURSE_PENIS_LICKING:
return filtered(lambda text: "licking" in text or "tongue" in text)
if action_kind == outercourse_policy.OUTERCOURSE_HANDJOB:
return filtered(lambda text: any(term in text for term in ("handjob", "hand job", "hand wrapped", "two-handed")))
if action_kind == outercourse_policy.OUTERCOURSE_FOOTJOB:
return filtered(lambda text: any(term in text for term in ("footjob", "feet", "soles", "toes")))
return values
def outercourse_axis_values_for_position(values: list[Any], position: str, axis_name: str) -> list[Any]:
action_kind = outercourse_policy.infer_outercourse_action_kind(position)
if action_kind == outercourse_policy.OUTERCOURSE_GENERIC:
return values
axis_name = str(axis_name or "").lower()
if axis_name not in {"contact_detail", "hand_detail", "texture_detail", "visibility", "body_contact"}:
return values
def value_text(value: Any) -> str:
return entry_text(value).lower()
def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]:
matches = [
value
for value in values
if any(term in value_text(value) for term in terms)
and not any(term in value_text(value) for term in excluded_terms)
]
if matches:
return matches
if excluded_terms:
non_excluded = [
value
for value in values
if not any(term in value_text(value) for term in excluded_terms)
]
if non_excluded:
return non_excluded
return values
if action_kind == outercourse_policy.OUTERCOURSE_BOOBJOB:
by_axis = {
"contact_detail": ("compressed", "glans", "breast", "breasts", "soft tissue", "skin visibly"),
"hand_detail": ("breast", "breasts", "fingers"),
"texture_detail": ("compression", "soft flesh", "skin", "flesh", "asymmetry"),
"visibility": ("breast", "breasts", "glans", "shaft"),
"body_contact": ("torso", "body angle", "body angled", "shoulders", "hips"),
}
excluded_by_axis = {
"contact_detail": ("hand wrapped", "fingers and palm", "soles", "toes", "balls", "tongue"),
"hand_detail": ("base of the penis", "penis shaft", "balls", "thigh", "ankles", "stroking"),
"texture_detail": ("toes", "soles", "tongue"),
"visibility": ("balls", "soles", "toes", "hand"),
"body_contact": ("head tucked", "face directly", "base of the penis"),
}
return filtered(
by_axis.get(axis_name, ("breast", "breasts", "shaft")),
excluded_by_axis.get(axis_name, ()),
)
if action_kind == outercourse_policy.OUTERCOURSE_TESTICLE:
by_axis = {
"contact_detail": ("balls", "lips", "tongue", "wet"),
"hand_detail": ("balls", "base", "thigh"),
"texture_detail": ("wet", "saliva", "skin"),
"visibility": ("balls", "mouth"),
"body_contact": ("torso", "shoulders", "head tucked", "base of the penis", "knees", "thigh"),
}
return filtered(by_axis.get(axis_name, ("balls", "mouth", "tongue")))
if action_kind == outercourse_policy.OUTERCOURSE_PENIS_LICKING:
by_axis = {
"contact_detail": ("tongue", "lips", "glans", "shaft", "wet"),
"hand_detail": ("base", "penis", "thigh"),
"texture_detail": ("wet", "saliva", "skin"),
"visibility": ("tongue", "penis"),
"body_contact": ("head low", "face directly", "torso", "pelvis", "base of the penis", "hips", "body angled"),
}
return filtered(by_axis.get(axis_name, ("tongue", "glans", "shaft")))
if action_kind == outercourse_policy.OUTERCOURSE_HANDJOB:
by_axis = {
"contact_detail": ("hand", "fingers", "palm", "shaft", "glans"),
"hand_detail": ("hand", "hands", "shaft", "penis"),
"texture_detail": ("fingers", "pressure", "skin", "shaft"),
"visibility": ("hand", "penis", "shaft", "glans"),
"body_contact": ("hips", "knees", "body angle"),
}
excluded_by_axis = {
"contact_detail": ("balls", "soles", "toes", "breast", "breasts", "tongue"),
"hand_detail": ("balls", "thigh", "ankles", "breast", "breasts"),
"texture_detail": ("toes", "soles", "tongue", "breast", "breasts"),
"visibility": ("balls", "feet", "soles", "breast", "mouth"),
}
return filtered(
by_axis.get(axis_name, ("hand", "penis", "shaft")),
excluded_by_axis.get(axis_name, ()),
)
if action_kind == outercourse_policy.OUTERCOURSE_FOOTJOB:
by_axis = {
"contact_detail": ("soles", "toes"),
"hand_detail": ("ankles", "thighs"),
"texture_detail": ("toes", "soles", "pressure"),
"visibility": ("feet", "soles"),
"body_contact": ("legs", "knees", "body angled"),
}
excluded_by_axis = {
"contact_detail": ("hand", "finger", "palm", "balls", "tongue", "breast"),
"texture_detail": ("fingers", "tongue", "breast"),
"visibility": ("hand", "balls", "breast"),
}
return filtered(
by_axis.get(axis_name, ("feet", "soles", "toes")),
excluded_by_axis.get(axis_name, ()),
)
return values
def anal_axis_values_for_position(values: list[Any], position: str, axis_name: str) -> list[Any]:
position_text = str(position or "").lower()
if not position_text:
return values
axis_name = str(axis_name or "").lower()
if axis_name not in {"body_contact", "hand_detail", "leg_detail", "thrust_detail", "visibility"}:
return values
def value_text(value: Any) -> str:
return entry_text(value).lower()
def filtered(terms: tuple[str, ...], excluded_terms: tuple[str, ...] = ()) -> list[Any]:
matches = [
value
for value in values
if any(term in value_text(value) for term in terms)
and not any(term in value_text(value) for term in excluded_terms)
]
if matches:
return matches
if excluded_terms:
non_excluded = [
value
for value in values
if not any(term in value_text(value) for term in excluded_terms)
]
if non_excluded:
return non_excluded
return values
if "side-lying" in position_text or "spooning" in position_text:
by_axis = {
"body_contact": ("bodies locked", "chests pressed", "sweaty", "hips pressed"),
"hand_detail": ("hips", "waist", "cheeks", "shoulders"),
"leg_detail": ("one leg lifted", "thighs held open", "legs spread"),
"thrust_detail": ("pelvis pressed", "bodies rocking", "wet skin", "hard grinding"),
"visibility": ("ass and penis", "anal penetration", "spread cheeks", "genital contact"),
}
return filtered(
by_axis.get(axis_name, ("side", "thigh", "hips")),
("standing", "kneeling", "draped over shoulders", "knees pressed to chest"),
)
if "standing" in position_text:
by_axis = {
"body_contact": ("hips pressed", "bodies locked", "one body bent over", "ass lifted", "sweaty"),
"hand_detail": ("hips", "waist", "cheeks", "shoulders"),
"leg_detail": ("standing", "one foot planted"),
"thrust_detail": ("hips", "pelvis", "hard grinding", "bodies rocking"),
"visibility": ("ass and penis", "anal penetration", "spread cheeks", "genital contact"),
}
return filtered(
by_axis.get(axis_name, ("standing", "hips")),
("kneeling", "draped over shoulders", "knees pressed to chest", "side-lying"),
)
if "edge-of-bed" in position_text or "bed-edge" in position_text or "edge supported" in position_text:
by_axis = {
"body_contact": ("thighs held open", "hips pressed", "bodies locked", "ass lifted"),
"hand_detail": ("hips", "waist", "cheeks", "thighs"),
"leg_detail": ("knees pressed", "legs draped", "thighs held open", "one foot planted"),
"thrust_detail": ("hips", "pelvis", "hard grinding", "bodies rocking"),
"visibility": ("ass and penis", "anal penetration", "open thighs", "genital contact"),
}
return filtered(by_axis.get(axis_name, ("thigh", "hips")), ("standing", "side-lying"))
if "kneeling" in position_text:
by_axis = {
"body_contact": ("ass lifted", "hips pressed", "bodies locked", "one body bent over"),
"hand_detail": ("hips", "waist", "cheeks", "thighs"),
"leg_detail": ("kneeling", "thighs held open", "legs spread"),
"thrust_detail": ("hips", "pelvis", "ass pushed", "hard grinding"),
"visibility": ("ass and penis", "anal penetration", "spread cheeks", "genital contact"),
}
return filtered(
by_axis.get(axis_name, ("kneeling", "hips")),
("standing", "draped over shoulders", "knees pressed to chest", "side-lying"),
)
if "doggy" in position_text or "face-down" in position_text or "bent-over" in position_text:
by_axis = {
"body_contact": ("ass lifted", "one body bent over", "hips pressed", "bodies locked"),
"hand_detail": ("hips", "waist", "cheeks", "thighs"),
"leg_detail": ("legs spread", "kneeling", "one foot planted", "standing"),
"thrust_detail": ("ass pushed", "hips", "pelvis", "hard grinding"),
"visibility": ("ass and penis", "anal penetration", "spread cheeks", "genital contact"),
}
excluded = ("side-lying", "draped over shoulders", "knees pressed to chest")
if "face-down" in position_text or "doggy" in position_text:
excluded = (*excluded, "standing")
return filtered(by_axis.get(axis_name, ("ass", "hips")), excluded)
return values
def _format(template: str, context: dict[str, Any]) -> str:
fields = {key for _, key, _, _ in Formatter().parse(template) if key}
safe_context = SafeFormatDict({key: "" for key in fields})
safe_context.update(context)
return template.format_map(safe_context)
def compose_item(
rng: random.Random,
category: dict[str, Any],
subcategory: dict[str, Any],
item: Any,
women_count: int = 1,
men_count: int = 1,
) -> tuple[str, str, dict[str, str], dict[str, Any]]:
templates = category_policy.template_list(category, subcategory, item, "item_templates")
axes = category_policy.merged_axes(category, subcategory, item)
inherited_metadata = template_policy.inherited_template_metadata(category, subcategory, item)
if templates and axes:
template_entry = weighted_choice(rng, category_policy.compatible_entries(templates, women_count, men_count))
template = entry_text(template_entry)
fields = [key for _, key, _, _ in Formatter().parse(template) if key]
unique_fields = list(dict.fromkeys(fields))
axis_values: dict[str, str] = {}
subcategory_slug = str(subcategory.get("slug") or "").lower()
if subcategory_slug in ("oral_sex", "outercourse_sex", "anal_double_penetration") and "position" in unique_fields and axes.get("position"):
position_values = category_policy.compatible_entries(axes["position"], women_count, men_count)
axis_values["position"] = entry_text(weighted_choice(rng, position_values))
for name in unique_fields:
if name in axis_values or name not in axes or not axes[name]:
continue
values = category_policy.compatible_entries(axes[name], women_count, men_count)
if subcategory_slug == "oral_sex" and name == "oral_act":
values = oral_acts_for_position(values, axis_values.get("position", ""))
elif subcategory_slug == "oral_sex":
values = oral_axis_values_for_context(
values,
axis_values.get("position", ""),
axis_values.get("oral_act", ""),
name,
)
if subcategory_slug == "outercourse_sex" and name == "outer_act":
values = outercourse_acts_for_position(values, axis_values.get("position", ""))
if subcategory_slug == "outercourse_sex":
values = outercourse_axis_values_for_position(values, axis_values.get("position", ""), name)
if subcategory_slug == "anal_double_penetration":
values = anal_axis_values_for_position(values, axis_values.get("position", ""), name)
axis_values[name] = entry_text(weighted_choice(rng, values))
item_prompt = _format(template, axis_values).strip()
name = item_name(item) or subcategory["name"]
return (
item_prompt,
name,
axis_values,
template_policy.merge_template_metadata(inherited_metadata, template_policy.template_metadata(template_entry)),
)
return item_text(item), item_name(item), {}, template_policy.merge_template_metadata(
inherited_metadata,
template_policy.template_metadata(item),
)