575 lines
20 KiB
Python
575 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import random
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT_DIR = Path(__file__).resolve().parent
|
|
CATEGORY_DIR = ROOT_DIR / "categories"
|
|
RANDOM_SUBCATEGORY = "random"
|
|
|
|
|
|
def category_json_files() -> list[Path]:
|
|
if not CATEGORY_DIR.exists():
|
|
return []
|
|
return sorted(path for path in CATEGORY_DIR.glob("*.json") if path.is_file())
|
|
|
|
|
|
def read_category_json(path: Path) -> dict[str, Any]:
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"{path} must contain a JSON object")
|
|
return data
|
|
|
|
|
|
def _slug(value: str) -> str:
|
|
text = str(value or "").lower()
|
|
text = re.sub(r"[^a-z0-9]+", "_", text)
|
|
return text.strip("_")[:48] or "custom"
|
|
|
|
|
|
def _list_from(value: Any) -> list[Any]:
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, list):
|
|
return value
|
|
return [value]
|
|
|
|
|
|
def _is_false(value: Any) -> bool:
|
|
if isinstance(value, bool):
|
|
return value is False
|
|
if isinstance(value, str):
|
|
return value.strip().lower() in ("false", "0", "no", "off")
|
|
return False
|
|
|
|
|
|
def _entry_text(item: Any) -> str:
|
|
if isinstance(item, dict):
|
|
return str(
|
|
item.get("template")
|
|
or item.get("prompt")
|
|
or item.get("text")
|
|
or item.get("description")
|
|
or item.get("name")
|
|
or ""
|
|
).strip()
|
|
return str(item).strip()
|
|
|
|
|
|
def _unique_extend(target: list[Any], additions: list[Any]) -> None:
|
|
seen = set()
|
|
for item in target:
|
|
try:
|
|
seen.add(json.dumps(item, sort_keys=True))
|
|
except TypeError:
|
|
seen.add(repr(item))
|
|
for item in additions:
|
|
try:
|
|
marker = json.dumps(item, sort_keys=True)
|
|
except TypeError:
|
|
marker = repr(item)
|
|
if marker not in seen:
|
|
target.append(item)
|
|
seen.add(marker)
|
|
|
|
|
|
def _weighted_choice(rng: random.Random, items: list[Any]) -> Any:
|
|
if not items:
|
|
raise ValueError("Cannot choose from an empty list")
|
|
weights: list[float] = []
|
|
for item in items:
|
|
weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0
|
|
try:
|
|
weights.append(max(0.0, float(weight)))
|
|
except (TypeError, ValueError):
|
|
weights.append(1.0)
|
|
total = sum(weights)
|
|
if total <= 0:
|
|
return items[rng.randrange(len(items))]
|
|
pick = rng.random() * total
|
|
running = 0.0
|
|
for item, weight in zip(items, weights):
|
|
running += weight
|
|
if pick <= running:
|
|
return item
|
|
return items[-1]
|
|
|
|
|
|
def template_list(category: dict[str, Any], subcategory: dict[str, Any], item: Any, key: str) -> list[Any]:
|
|
if isinstance(item, dict) and key in item:
|
|
return _list_from(item[key])
|
|
if key in subcategory:
|
|
return _list_from(subcategory[key])
|
|
if key in category:
|
|
return _list_from(category[key])
|
|
return []
|
|
|
|
|
|
def _constraint_int(entry: dict[str, Any], key: str) -> int | None:
|
|
if key not in entry:
|
|
return None
|
|
try:
|
|
return int(entry[key])
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _cast_requirement_matches(requirement: str, women_count: int, men_count: int) -> bool:
|
|
total = women_count + men_count
|
|
requirement = requirement.strip().lower()
|
|
if requirement in ("", "any"):
|
|
return True
|
|
if requirement == "women_only":
|
|
return women_count > 0 and men_count == 0
|
|
if requirement == "men_only":
|
|
return men_count > 0 and women_count == 0
|
|
if requirement == "mixed":
|
|
return women_count > 0 and men_count > 0
|
|
if requirement == "has_women":
|
|
return women_count > 0
|
|
if requirement == "has_men":
|
|
return men_count > 0
|
|
if requirement == "solo":
|
|
return total == 1
|
|
if requirement == "couple":
|
|
return total == 2
|
|
if requirement == "threesome":
|
|
return total == 3
|
|
if requirement == "group":
|
|
return total >= 4
|
|
return True
|
|
|
|
|
|
def _is_toy_assisted_double_couple_text(text: str) -> bool:
|
|
text = text.lower()
|
|
if "toy" not in text:
|
|
return False
|
|
return any(
|
|
token in text
|
|
for token in (
|
|
"double penetration",
|
|
"double-penetration",
|
|
"vaginal and anal penetration",
|
|
"second penetration point",
|
|
"second point of contact",
|
|
"second contact",
|
|
)
|
|
)
|
|
|
|
|
|
def _heuristic_cast_compatible(text: str, women_count: int, men_count: int) -> bool:
|
|
text = text.lower()
|
|
if not text:
|
|
return True
|
|
total = women_count + men_count
|
|
if total == 2 and women_count == 1 and men_count == 1:
|
|
if "{double_act}" in text:
|
|
return False
|
|
if _is_toy_assisted_double_couple_text(text):
|
|
return False
|
|
if total == 1:
|
|
solo_blocked_terms = (
|
|
"partner",
|
|
"partners",
|
|
"two bodies",
|
|
"three bodies",
|
|
"bodies still pressed",
|
|
"bodies pressed",
|
|
"bodies tangled",
|
|
"wet bodies",
|
|
"chests heaving together",
|
|
"straddling a partner",
|
|
"shared climax",
|
|
"between two",
|
|
"from both sides",
|
|
"front-and-back",
|
|
"body contact",
|
|
)
|
|
if any(term in text for term in solo_blocked_terms):
|
|
return False
|
|
solo_toy_terms = ("toy", "dildo", "finger", "fingers", "self")
|
|
if "penetration" in text and not any(term in text for term in solo_toy_terms):
|
|
return False
|
|
if total < 3 and "threesome" in text:
|
|
return False
|
|
if total != 3 and ("centered threesome" in text or "three-way" in text):
|
|
return False
|
|
if total < 3 and ("three bodies" in text or "center partner" in text or "center body" in text):
|
|
return False
|
|
if total < 4 and ("orgy" in text or "group sex" in text or "group-sex" in text or "group pile" in text):
|
|
return False
|
|
if total < 3 and (
|
|
"double penetration" in text
|
|
or "two partners penetrating" in text
|
|
or "front-and-back penetration" in text
|
|
or "one penis in pussy and one penis in ass" in text
|
|
or "pussy and ass filled" in text
|
|
or "vaginal and anal penetration at the same time" in text
|
|
or "front-and-back double penetration" in text
|
|
or "hardcore double penetration" in text
|
|
or "kneeling double penetration" in text
|
|
or "standing supported double penetration" in text
|
|
or "deep double penetration" in text
|
|
or "between two partners" in text
|
|
or "from both sides" in text
|
|
):
|
|
toy_terms = ("strap-on", "strap on", "dildo", "toy", "finger")
|
|
if not any(term in text for term in toy_terms):
|
|
return False
|
|
if men_count == 0:
|
|
toy_terms = ("strap-on", "strap on", "dildo", "toy", "finger", "fingers")
|
|
penetration_terms = (
|
|
"vaginal penetration",
|
|
"deep vaginal sex",
|
|
"penetrative sex",
|
|
"pussy penetration",
|
|
"pussy stretched",
|
|
"vaginal thrusting",
|
|
"full-body penetrative",
|
|
"close-contact vaginal",
|
|
"penetration clearly visible",
|
|
"explicit penetrative contact",
|
|
)
|
|
if any(term in text for term in penetration_terms) and not any(term in text for term in toy_terms):
|
|
return False
|
|
male_terms = (
|
|
" penis",
|
|
"penis ",
|
|
"penises",
|
|
"cum",
|
|
"creampie",
|
|
"facial",
|
|
"blowjob",
|
|
"fellatio",
|
|
"deepthroat",
|
|
"ejaculation",
|
|
"semen",
|
|
)
|
|
if any(term in text for term in male_terms) and not any(term in text for term in toy_terms):
|
|
return False
|
|
elif men_count < 2 and "penises" in text:
|
|
return False
|
|
if women_count == 0:
|
|
if "penetrative sex" in text and not any(term in text for term in ("anal", "ass", "male/male", "men")):
|
|
return False
|
|
female_terms = (
|
|
"pussy",
|
|
"vaginal",
|
|
"vagina",
|
|
"cunnilingus",
|
|
"clit",
|
|
"clitoris",
|
|
"breasts",
|
|
"breast ",
|
|
"nipples",
|
|
"nipple",
|
|
"underboob",
|
|
)
|
|
if any(term in text for term in female_terms):
|
|
return False
|
|
return True
|
|
|
|
|
|
def compatible_entry(entry: Any, women_count: int, men_count: int) -> bool:
|
|
if not isinstance(entry, dict):
|
|
return _heuristic_cast_compatible(_entry_text(entry), women_count, men_count)
|
|
total = women_count + men_count
|
|
for key, value in (
|
|
("min_women", women_count),
|
|
("min_men", men_count),
|
|
("min_people", total),
|
|
):
|
|
minimum = _constraint_int(entry, key)
|
|
if minimum is not None and value < minimum:
|
|
return False
|
|
for key, value in (
|
|
("max_women", women_count),
|
|
("max_men", men_count),
|
|
("max_people", total),
|
|
):
|
|
maximum = _constraint_int(entry, key)
|
|
if maximum is not None and value > maximum:
|
|
return False
|
|
requirements = _list_from(entry.get("cast", [])) + _list_from(entry.get("requires", []))
|
|
if requirements and not all(_cast_requirement_matches(str(req), women_count, men_count) for req in requirements):
|
|
return False
|
|
if any(key in entry for key in ("subcategories", "item_templates", "item_axes")):
|
|
return True
|
|
return _heuristic_cast_compatible(_entry_text(entry), women_count, men_count)
|
|
|
|
|
|
def compatible_entries(entries: list[Any], women_count: int, men_count: int) -> list[Any]:
|
|
filtered = [entry for entry in entries if compatible_entry(entry, women_count, men_count)]
|
|
return filtered or entries
|
|
|
|
|
|
def merged_axes(category: dict[str, Any], subcategory: dict[str, Any], item: Any) -> dict[str, list[Any]]:
|
|
axes: dict[str, list[Any]] = {}
|
|
for source in (category, subcategory, item if isinstance(item, dict) else None):
|
|
if not isinstance(source, dict):
|
|
continue
|
|
raw_axes = source.get("item_axes", {})
|
|
if raw_axes is None:
|
|
continue
|
|
if not isinstance(raw_axes, dict):
|
|
raise ValueError("item_axes must be a JSON object")
|
|
for key, values in raw_axes.items():
|
|
axes[str(key)] = _list_from(values)
|
|
return axes
|
|
|
|
|
|
def _normalize_subcategories(category: dict[str, Any]) -> list[dict[str, Any]]:
|
|
raw = category.get("subcategories", [])
|
|
if isinstance(raw, dict):
|
|
raw = [
|
|
{"name": name, **(value if isinstance(value, dict) else {"items": value})}
|
|
for name, value in raw.items()
|
|
]
|
|
subcategories: list[dict[str, Any]] = []
|
|
for entry in _list_from(raw):
|
|
if isinstance(entry, str):
|
|
sub = {"name": entry, "items": [entry]}
|
|
elif isinstance(entry, dict):
|
|
sub = dict(entry)
|
|
else:
|
|
raise ValueError(f"Subcategory must be an object or string: {entry!r}")
|
|
name = str(sub.get("name") or sub.get("slug") or "General").strip()
|
|
sub["name"] = name
|
|
sub["slug"] = str(sub.get("slug") or _slug(name))
|
|
if "items" not in sub and "prompts" in sub:
|
|
sub["items"] = sub["prompts"]
|
|
if "items" not in sub:
|
|
sub["items"] = [name]
|
|
subcategories.append(sub)
|
|
if not subcategories:
|
|
name = str(category.get("name") or "General")
|
|
subcategories.append({"name": "General", "slug": "general", "items": [name]})
|
|
return subcategories
|
|
|
|
|
|
def _normalize_categories(raw_categories: Any) -> list[dict[str, Any]]:
|
|
if isinstance(raw_categories, dict):
|
|
iterable = [
|
|
{"name": name, **(value if isinstance(value, dict) else {"subcategories": value})}
|
|
for name, value in raw_categories.items()
|
|
]
|
|
else:
|
|
iterable = _list_from(raw_categories)
|
|
|
|
categories: list[dict[str, Any]] = []
|
|
for entry in iterable:
|
|
if not isinstance(entry, dict):
|
|
raise ValueError(f"Category must be an object: {entry!r}")
|
|
category = dict(entry)
|
|
name = str(category.get("name") or category.get("slug") or "Custom").strip()
|
|
category["name"] = name
|
|
category["slug"] = str(category.get("slug") or _slug(name))
|
|
category["subcategories"] = _normalize_subcategories(category)
|
|
categories.append(category)
|
|
return categories
|
|
|
|
|
|
def load_category_library() -> list[dict[str, Any]]:
|
|
categories: list[dict[str, Any]] = []
|
|
for path in category_json_files():
|
|
data = read_category_json(path)
|
|
categories.extend(_normalize_categories(data.get("categories", [])))
|
|
return categories
|
|
|
|
|
|
def load_named_pool_library(key: str) -> dict[str, list[Any]]:
|
|
pools: dict[str, list[Any]] = {}
|
|
for path in category_json_files():
|
|
data = read_category_json(path)
|
|
raw_pools = data.get(key, {})
|
|
if not raw_pools:
|
|
continue
|
|
if not isinstance(raw_pools, dict):
|
|
raise ValueError(f"{key} in {path} must be an object")
|
|
for name, entries in raw_pools.items():
|
|
pool_name = str(name).strip()
|
|
if not pool_name:
|
|
continue
|
|
pools.setdefault(pool_name, [])
|
|
_unique_extend(pools[pool_name], _list_from(entries))
|
|
return pools
|
|
|
|
|
|
def load_scene_pool_library() -> dict[str, list[Any]]:
|
|
return load_named_pool_library("scene_pools")
|
|
|
|
|
|
def load_expression_pool_library() -> dict[str, list[Any]]:
|
|
return load_named_pool_library("expression_pools")
|
|
|
|
|
|
def load_composition_pool_library() -> dict[str, list[Any]]:
|
|
return load_named_pool_library("composition_pools")
|
|
|
|
|
|
def find_category(categories: list[dict[str, Any]], name_or_slug: str) -> dict[str, Any] | None:
|
|
wanted = name_or_slug.strip().lower()
|
|
for category in categories:
|
|
if category["name"].lower() == wanted or category["slug"].lower() == wanted:
|
|
return category
|
|
return None
|
|
|
|
|
|
def exact_subcategory_selector(category: dict[str, Any], subcategory: dict[str, Any]) -> str:
|
|
return f"{category.get('name')} / {subcategory.get('name')}"
|
|
|
|
|
|
def split_exact_subcategory_choice(
|
|
categories: list[dict[str, Any]],
|
|
subcategory_choice: str,
|
|
) -> tuple[dict[str, Any], str] | None:
|
|
choice = str(subcategory_choice or "").strip()
|
|
if not choice or " / " not in choice:
|
|
return None
|
|
candidates: list[tuple[int, dict[str, Any], str]] = []
|
|
for category in categories:
|
|
for category_label in (category.get("name", ""), category.get("slug", "")):
|
|
category_label = str(category_label).strip()
|
|
prefix = f"{category_label} / "
|
|
if category_label and choice.lower().startswith(prefix.lower()):
|
|
candidates.append((len(prefix), category, choice[len(prefix) :].strip()))
|
|
if candidates:
|
|
_length, category, subcategory_name = max(candidates, key=lambda candidate: candidate[0])
|
|
return category, subcategory_name
|
|
return None
|
|
|
|
|
|
def _base_cast_counts(women_count: int, men_count: int) -> tuple[int, int]:
|
|
women_count = max(0, int(women_count))
|
|
men_count = max(0, int(men_count))
|
|
if women_count + men_count == 0:
|
|
women_count = 1
|
|
return women_count, men_count
|
|
|
|
|
|
def _counts_for_exact_subcategory(
|
|
subcategory: dict[str, Any],
|
|
women_count: int,
|
|
men_count: int,
|
|
) -> tuple[int, int]:
|
|
women_count, men_count = _base_cast_counts(women_count, men_count)
|
|
|
|
min_women = _constraint_int(subcategory, "min_women")
|
|
if min_women is not None and women_count < min_women:
|
|
women_count = min_women
|
|
min_men = _constraint_int(subcategory, "min_men")
|
|
if min_men is not None and men_count < min_men:
|
|
men_count = min_men
|
|
|
|
min_people = _constraint_int(subcategory, "min_people")
|
|
if min_people is not None:
|
|
missing = min_people - (women_count + men_count)
|
|
if missing > 0:
|
|
if women_count > 0 or men_count == 0:
|
|
women_count += missing
|
|
else:
|
|
men_count += missing
|
|
return women_count, men_count
|
|
|
|
|
|
def find_subcategory(
|
|
categories: list[dict[str, Any]],
|
|
category_choice: str,
|
|
subcategory_choice: str,
|
|
category_rng: random.Random,
|
|
subcategory_rng: random.Random,
|
|
women_count: int = 1,
|
|
men_count: int = 1,
|
|
random_subcategory: str = RANDOM_SUBCATEGORY,
|
|
) -> tuple[dict[str, Any], dict[str, Any], int, int]:
|
|
women_count, men_count = _base_cast_counts(women_count, men_count)
|
|
if subcategory_choice and subcategory_choice != random_subcategory and " / " in subcategory_choice:
|
|
exact_choice = split_exact_subcategory_choice(categories, subcategory_choice)
|
|
if not exact_choice:
|
|
category_name = str(subcategory_choice).split(" / ", 1)[0]
|
|
raise ValueError(f"Unknown category in subcategory picker: {category_name}")
|
|
category, subcategory_name = exact_choice
|
|
wanted = subcategory_name.strip().lower()
|
|
for subcategory in category["subcategories"]:
|
|
if subcategory["name"].lower() == wanted or subcategory["slug"].lower() == wanted:
|
|
adjusted_women_count, adjusted_men_count = _counts_for_exact_subcategory(
|
|
subcategory,
|
|
women_count,
|
|
men_count,
|
|
)
|
|
if not compatible_entry(subcategory, adjusted_women_count, adjusted_men_count):
|
|
raise ValueError(
|
|
f"Subcategory '{subcategory['name']}' is not compatible with "
|
|
f"women_count={women_count}, men_count={men_count}"
|
|
)
|
|
return category, subcategory, adjusted_women_count, adjusted_men_count
|
|
raise ValueError(f"Unknown subcategory '{subcategory_name}' for category '{category['name']}'")
|
|
|
|
if category_choice == "custom_random":
|
|
if not categories:
|
|
raise ValueError("No custom categories found in categories/*.json")
|
|
category = _weighted_choice(category_rng, categories)
|
|
else:
|
|
category = find_category(categories, category_choice)
|
|
if not category:
|
|
raise ValueError(f"Unknown custom category: {category_choice}")
|
|
subcategories = compatible_entries(category["subcategories"], women_count, men_count)
|
|
subcategory = _weighted_choice(subcategory_rng, subcategories)
|
|
return category, subcategory, women_count, men_count
|
|
|
|
|
|
def merged_field(category: dict[str, Any], subcategory: dict[str, Any], item: Any, key: str, default: Any = None) -> Any:
|
|
if isinstance(item, dict) and key in item:
|
|
return item[key]
|
|
if key in subcategory:
|
|
return subcategory[key]
|
|
if key in category:
|
|
return category[key]
|
|
return default
|
|
|
|
|
|
def _sources_with_inheritance(
|
|
category: dict[str, Any],
|
|
subcategory: dict[str, Any],
|
|
item: Any,
|
|
inherit_key: str,
|
|
) -> tuple[Any, ...]:
|
|
item_source = item if isinstance(item, dict) else None
|
|
if item_source is not None and _is_false(item_source.get(inherit_key)):
|
|
return (item_source,)
|
|
if _is_false(subcategory.get(inherit_key)):
|
|
return (subcategory, item_source)
|
|
return (category, subcategory, item_source)
|
|
|
|
|
|
def configured_pool(
|
|
category: dict[str, Any],
|
|
subcategory: dict[str, Any],
|
|
item: Any,
|
|
direct_key: str,
|
|
pool_key: str,
|
|
pool_library: dict[str, list[Any]],
|
|
inherit_key: str,
|
|
) -> list[Any]:
|
|
entries: list[Any] = []
|
|
singular_pool_key = pool_key[:-1] if pool_key.endswith("s") else pool_key
|
|
for source in _sources_with_inheritance(category, subcategory, item, inherit_key):
|
|
if not isinstance(source, dict):
|
|
continue
|
|
if direct_key in source:
|
|
_unique_extend(entries, _list_from(source[direct_key]))
|
|
refs = _list_from(source.get(singular_pool_key)) + _list_from(source.get(pool_key))
|
|
for ref in refs:
|
|
ref_name = str(ref).strip()
|
|
if ref_name not in pool_library:
|
|
raise ValueError(f"Unknown {singular_pool_key} '{ref_name}'")
|
|
_unique_extend(entries, pool_library[ref_name])
|
|
return entries
|