Add configurable location pools
This commit is contained in:
+240
-2
@@ -1310,6 +1310,36 @@ def load_scene_pool_library() -> dict[str, list[Any]]:
|
||||
return _load_named_pool_library("scene_pools")
|
||||
|
||||
|
||||
LOCATION_POOL_PRESETS = {
|
||||
"custom_only": (),
|
||||
"all_json_locations": ("*",),
|
||||
"casual_all": ("casual_",),
|
||||
"casual_urban": ("casual_urban_scenes",),
|
||||
"casual_summer": ("casual_summer_scenes",),
|
||||
"casual_home": ("casual_lounge_scenes",),
|
||||
"casual_smart": ("casual_smart_scenes",),
|
||||
"creator_softcore": ("softcore_creator_scenes", "mirror_scenes", "boudoir_bedroom_scenes"),
|
||||
"mirror_rooms": ("mirror_scenes", "hardcore_mirror_scenes"),
|
||||
"boudoir_bedroom": ("boudoir_bedroom_scenes", "hardcore_bed_scenes"),
|
||||
"fetish_studio": ("fetish_studio_scenes",),
|
||||
"costume_backstage": ("costume_backstage_scenes",),
|
||||
"hardcore_all": ("hardcore_",),
|
||||
"hardcore_private": ("hardcore_private_scenes",),
|
||||
"hardcore_bed": ("hardcore_bed_scenes",),
|
||||
"hardcore_penetrative": ("hardcore_penetrative_scenes",),
|
||||
"hardcore_oral": ("hardcore_oral_scenes",),
|
||||
"hardcore_anal": ("hardcore_anal_scenes",),
|
||||
"hardcore_threesome": ("hardcore_threesome_scenes",),
|
||||
"hardcore_group": ("hardcore_group_scenes",),
|
||||
"hardcore_climax": ("hardcore_climax_scenes",),
|
||||
}
|
||||
|
||||
|
||||
def location_pool_preset_choices() -> list[str]:
|
||||
pool_choices = [f"pool:{key}" for key in sorted(load_scene_pool_library())]
|
||||
return list(LOCATION_POOL_PRESETS) + pool_choices
|
||||
|
||||
|
||||
def load_expression_pool_library() -> dict[str, list[Any]]:
|
||||
return _load_named_pool_library("expression_pools")
|
||||
|
||||
@@ -1676,6 +1706,124 @@ def build_filter_config_json(
|
||||
)
|
||||
|
||||
|
||||
def _location_pool_names_for_preset(preset: str) -> list[str]:
|
||||
scene_pools = load_scene_pool_library()
|
||||
preset = str(preset or "custom_only")
|
||||
if preset.startswith("pool:"):
|
||||
pool_name = preset.split(":", 1)[1].strip()
|
||||
return [pool_name] if pool_name in scene_pools else []
|
||||
selectors = LOCATION_POOL_PRESETS.get(preset, ())
|
||||
names: list[str] = []
|
||||
for selector in selectors:
|
||||
if selector == "*":
|
||||
_unique_extend(names, sorted(scene_pools))
|
||||
elif selector.endswith("_"):
|
||||
_unique_extend(names, sorted(name for name in scene_pools if name.startswith(selector)))
|
||||
elif selector in scene_pools:
|
||||
_unique_extend(names, [selector])
|
||||
return names
|
||||
|
||||
|
||||
def _custom_location_entries(custom_locations: str) -> list[dict[str, str]]:
|
||||
entries: list[dict[str, str]] = []
|
||||
for raw_line in str(custom_locations or "").splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
slug = ""
|
||||
prompt = line
|
||||
if ":" in line:
|
||||
maybe_slug, maybe_prompt = line.split(":", 1)
|
||||
if maybe_slug.strip() and maybe_prompt.strip():
|
||||
slug = _slug(maybe_slug)
|
||||
prompt = maybe_prompt.strip()
|
||||
prompt = prompt.strip()
|
||||
if prompt:
|
||||
entries.append({"slug": slug or _slug(prompt), "prompt": prompt})
|
||||
return entries
|
||||
|
||||
|
||||
def _scene_entries_for_pool_names(pool_names: list[str]) -> list[Any]:
|
||||
scene_pools = load_scene_pool_library()
|
||||
entries: list[Any] = []
|
||||
for pool_name in pool_names:
|
||||
if pool_name not in scene_pools:
|
||||
continue
|
||||
_unique_extend(entries, scene_pools[pool_name])
|
||||
return entries
|
||||
|
||||
|
||||
def build_location_pool_json(
|
||||
enabled: bool = True,
|
||||
combine_mode: str = "replace",
|
||||
preset: str = "custom_only",
|
||||
custom_locations: str = "",
|
||||
location_config: str | dict[str, Any] | None = "",
|
||||
) -> str:
|
||||
incoming = _parse_location_config(location_config)
|
||||
combine_mode = combine_mode if combine_mode in ("replace", "add") else "replace"
|
||||
pool_names = _location_pool_names_for_preset(preset)
|
||||
entries = _scene_entries_for_pool_names(pool_names)
|
||||
_unique_extend(entries, _custom_location_entries(custom_locations))
|
||||
|
||||
if combine_mode == "add" and incoming.get("enabled"):
|
||||
apply_mode = str(incoming.get("apply_mode") or "replace")
|
||||
merged_pool_names = _list_from(incoming.get("pool_names"))
|
||||
_unique_extend(merged_pool_names, pool_names)
|
||||
merged_entries = _list_from(incoming.get("scene_entries"))
|
||||
_unique_extend(merged_entries, entries)
|
||||
else:
|
||||
apply_mode = "replace" if combine_mode == "replace" else "add"
|
||||
merged_pool_names = pool_names
|
||||
merged_entries = entries
|
||||
|
||||
active = bool(enabled) and bool(merged_entries)
|
||||
summary = (
|
||||
f"{apply_mode}; pools={len(merged_pool_names)}; locations={len(merged_entries)}"
|
||||
if active
|
||||
else "disabled or empty"
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"enabled": active,
|
||||
"apply_mode": apply_mode,
|
||||
"pool_names": merged_pool_names,
|
||||
"scene_entries": merged_entries,
|
||||
"summary": summary,
|
||||
},
|
||||
ensure_ascii=True,
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
|
||||
def _parse_location_config(location_config: str | dict[str, Any] | None) -> dict[str, Any]:
|
||||
if not location_config:
|
||||
return {"enabled": False, "apply_mode": "replace", "pool_names": [], "scene_entries": []}
|
||||
if isinstance(location_config, dict):
|
||||
raw = dict(location_config)
|
||||
else:
|
||||
try:
|
||||
raw = json.loads(str(location_config))
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ValueError(f"Invalid location_config JSON: {exc}") from exc
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError("location_config must be a JSON object")
|
||||
entries = _list_from(raw.get("scene_entries"))
|
||||
if not entries and raw.get("pool_names"):
|
||||
entries = _scene_entries_for_pool_names([str(name) for name in _list_from(raw.get("pool_names"))])
|
||||
return {
|
||||
"enabled": bool(raw.get("enabled")) and bool(entries),
|
||||
"apply_mode": str(raw.get("apply_mode") or "replace") if str(raw.get("apply_mode") or "replace") in ("replace", "add") else "replace",
|
||||
"pool_names": [str(name) for name in _list_from(raw.get("pool_names")) if str(name).strip()],
|
||||
"scene_entries": entries,
|
||||
"summary": str(raw.get("summary") or ""),
|
||||
}
|
||||
|
||||
|
||||
def _location_config_active(location_config: dict[str, Any]) -> bool:
|
||||
return bool(location_config.get("enabled")) and bool(location_config.get("scene_entries"))
|
||||
|
||||
|
||||
def _ethnicity_text_from_value(value: Any) -> str:
|
||||
if isinstance(value, dict):
|
||||
return str(value.get("ethnicity") or "").strip()
|
||||
@@ -5620,7 +5768,17 @@ def _subject_context(
|
||||
}
|
||||
|
||||
|
||||
def _scene_pool(category: dict[str, Any], subcategory: dict[str, Any], item: Any, subject_type: str) -> list[Any]:
|
||||
def _scene_pool(
|
||||
category: dict[str, Any],
|
||||
subcategory: dict[str, Any],
|
||||
item: Any,
|
||||
subject_type: str,
|
||||
location_config: dict[str, Any] | None = None,
|
||||
) -> list[Any]:
|
||||
location_config = location_config or {}
|
||||
location_entries = _list_from(location_config.get("scene_entries"))
|
||||
if _location_config_active(location_config) and location_config.get("apply_mode") == "replace":
|
||||
return location_entries
|
||||
fallback = g.GROUP_SCENES if subject_type in ("group", "configured_cast") else g.SCENES
|
||||
scene_entries: list[Any] = []
|
||||
scene_pools = load_scene_pool_library()
|
||||
@@ -5642,9 +5800,63 @@ def _scene_pool(category: dict[str, Any], subcategory: dict[str, Any], item: Any
|
||||
if ref_name not in scene_pools:
|
||||
raise ValueError(f"Unknown scene pool '{ref_name}'")
|
||||
_unique_extend(scene_entries, scene_pools[ref_name])
|
||||
if _location_config_active(location_config) and location_config.get("apply_mode") == "add":
|
||||
_unique_extend(scene_entries, location_entries)
|
||||
return scene_entries or fallback
|
||||
|
||||
|
||||
def _legacy_scene_entries_for_row(row: dict[str, Any]) -> list[Any]:
|
||||
subject = str(row.get("primary_subject") or "").lower()
|
||||
if "group" in subject or "layout" in subject:
|
||||
return list(g.GROUP_SCENES)
|
||||
return list(g.SCENES)
|
||||
|
||||
|
||||
def _legacy_scene_text_for_slug(slug: str) -> str:
|
||||
for entry in list(g.SCENES) + list(g.GROUP_SCENES):
|
||||
entry_slug, entry_text = _pair_from(entry)
|
||||
if entry_slug == slug:
|
||||
return entry_text
|
||||
return ""
|
||||
|
||||
|
||||
def _apply_location_config_to_legacy_row(
|
||||
row: dict[str, Any],
|
||||
location_config: dict[str, Any],
|
||||
seed_config: dict[str, int],
|
||||
seed: int,
|
||||
row_number: int,
|
||||
) -> dict[str, Any]:
|
||||
if not _location_config_active(location_config):
|
||||
return row
|
||||
location_entries = _list_from(location_config.get("scene_entries"))
|
||||
if location_config.get("apply_mode") == "add":
|
||||
choices = _legacy_scene_entries_for_row(row)
|
||||
_unique_extend(choices, location_entries)
|
||||
else:
|
||||
choices = location_entries
|
||||
scene_rng = _axis_rng(seed_config, "scene", seed, row_number)
|
||||
scene_slug, scene_text = _choose_pair(scene_rng, choices)
|
||||
old_slug = str(row.get("scene") or "")
|
||||
old_text = _legacy_scene_text_for_slug(old_slug)
|
||||
row["source_scene"] = old_slug
|
||||
row["source_scene_text"] = old_text
|
||||
row["scene"] = scene_slug
|
||||
row["scene_text"] = scene_text
|
||||
row["location_config"] = location_config
|
||||
if old_text:
|
||||
row["prompt"] = str(row.get("prompt") or "").replace(f"Scene: {old_text}.", f"Scene: {scene_text}.")
|
||||
row["caption"] = str(row.get("caption") or "").replace(f", {old_text},", f", {scene_text},")
|
||||
else:
|
||||
row["prompt"] = re.sub(
|
||||
r"Scene:\s*.*?\.\s*Pose:",
|
||||
f"Scene: {scene_text}. Pose:",
|
||||
str(row.get("prompt") or ""),
|
||||
count=1,
|
||||
)
|
||||
return row
|
||||
|
||||
|
||||
def _sources_with_inheritance(
|
||||
category: dict[str, Any],
|
||||
subcategory: dict[str, Any],
|
||||
@@ -5870,6 +6082,7 @@ def _build_custom_row(
|
||||
character_cast: str | dict[str, Any] | list[Any] | None = None,
|
||||
expression_phase: str = "",
|
||||
hardcore_position_config: str | dict[str, Any] | None = None,
|
||||
location_config: str | dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
categories = load_category_library()
|
||||
category_rng = _axis_rng(seed_config, "category", seed, row_number)
|
||||
@@ -5881,6 +6094,7 @@ def _build_custom_row(
|
||||
expression_rng = _axis_rng(seed_config, "expression", seed, row_number)
|
||||
composition_rng = _axis_rng(seed_config, "composition", seed, row_number)
|
||||
parsed_hardcore_position_config = _parse_hardcore_position_config(hardcore_position_config)
|
||||
parsed_location_config = _parse_location_config(location_config)
|
||||
|
||||
requested_women_count = women_count
|
||||
requested_men_count = men_count
|
||||
@@ -5993,7 +6207,14 @@ def _build_custom_row(
|
||||
)
|
||||
cast_descriptor_text = _insta_of_prompt_cast_descriptors("; ".join(cast_descriptors))
|
||||
|
||||
scene_slug, scene = _choose_pair(scene_rng, _compatible_entries(_scene_pool(category, subcategory, item, subject_type), women_count, men_count))
|
||||
scene_slug, scene = _choose_pair(
|
||||
scene_rng,
|
||||
_compatible_entries(
|
||||
_scene_pool(category, subcategory, item, subject_type, parsed_location_config),
|
||||
women_count,
|
||||
men_count,
|
||||
),
|
||||
)
|
||||
pose = str(_merged_field(category, subcategory, item, "pose", "") or context.get("fallback_pose") or _choose_text(
|
||||
pose_rng, _compatible_entries(_pose_pool(category, subcategory, item, subject_type, poses), women_count, men_count)
|
||||
))
|
||||
@@ -6146,6 +6367,7 @@ def _build_custom_row(
|
||||
"custom_item": item_name,
|
||||
"item_axis_values": item_axis_values,
|
||||
"scene_text": scene,
|
||||
"location_config": parsed_location_config if _location_config_active(parsed_location_config) else {},
|
||||
"pose": pose,
|
||||
"seed_config": seed_config,
|
||||
"hardcore_position_config": (
|
||||
@@ -6218,6 +6440,7 @@ def build_prompt(
|
||||
expression_enabled: bool = True,
|
||||
expression_phase: str = "",
|
||||
hardcore_position_config: str | dict[str, Any] | None = None,
|
||||
location_config: str | dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
apply_pool_extensions()
|
||||
row_number = max(1, int(row_number))
|
||||
@@ -6228,6 +6451,7 @@ def build_prompt(
|
||||
minimal_ratio = _ratio_or_none(minimal_clothing_ratio)
|
||||
pose_ratio = _ratio_or_none(standard_pose_ratio)
|
||||
parsed_seed_config = _parse_seed_config(seed_config)
|
||||
parsed_location_config = _parse_location_config(location_config)
|
||||
content_rng = _axis_rng(parsed_seed_config, "content", seed, row_number)
|
||||
pose_axis_rng = _axis_rng(parsed_seed_config, "pose", seed, row_number)
|
||||
person_rng = _axis_rng(parsed_seed_config, "person", seed, row_number)
|
||||
@@ -6300,8 +6524,17 @@ def build_prompt(
|
||||
character_cast,
|
||||
expression_phase,
|
||||
hardcore_position_config,
|
||||
parsed_location_config,
|
||||
)
|
||||
|
||||
if row.get("source") == "built_in_generator":
|
||||
row = _apply_location_config_to_legacy_row(
|
||||
row,
|
||||
parsed_location_config,
|
||||
parsed_seed_config,
|
||||
seed,
|
||||
row_number,
|
||||
)
|
||||
if not expression_enabled:
|
||||
row = _disable_row_expression(row, "disabled")
|
||||
if extra_positive.strip():
|
||||
@@ -6329,6 +6562,7 @@ def build_prompt_from_configs(
|
||||
character_profile: str | dict[str, Any] | None = "",
|
||||
character_cast: str | dict[str, Any] | list[Any] | None = "",
|
||||
hardcore_position_config: str | dict[str, Any] | None = "",
|
||||
location_config: str | dict[str, Any] | None = "",
|
||||
extra_positive: str = "",
|
||||
extra_negative: str = "",
|
||||
) -> dict[str, Any]:
|
||||
@@ -6364,6 +6598,7 @@ def build_prompt_from_configs(
|
||||
character_profile=character_profile or "",
|
||||
character_cast=character_cast or "",
|
||||
hardcore_position_config=hardcore_position_config or "",
|
||||
location_config=location_config or "",
|
||||
)
|
||||
|
||||
|
||||
@@ -7146,6 +7381,7 @@ def build_insta_of_pair(
|
||||
character_profile: str | dict[str, Any] | None = "",
|
||||
character_cast: str | dict[str, Any] | list[Any] | None = "",
|
||||
hardcore_position_config: str | dict[str, Any] | None = "",
|
||||
location_config: str | dict[str, Any] | None = "",
|
||||
extra_positive: str = "",
|
||||
extra_negative: str = "",
|
||||
) -> dict[str, Any]:
|
||||
@@ -7223,6 +7459,7 @@ def build_insta_of_pair(
|
||||
expression_intensity=soft_expression_intensity,
|
||||
character_profile="" if primary_slot else character_profile or "",
|
||||
character_cast="",
|
||||
location_config=location_config or "",
|
||||
)
|
||||
soft_row["expression_intensity_source"] = soft_expression_intensity_source
|
||||
if primary_slot_context:
|
||||
@@ -7280,6 +7517,7 @@ def build_insta_of_pair(
|
||||
character_cast=character_cast or "",
|
||||
expression_phase="hardcore",
|
||||
hardcore_position_config=hardcore_position_config or "",
|
||||
location_config=location_config or "",
|
||||
)
|
||||
hard_row["hardcore_detail_density"] = options["hardcore_detail_density"]
|
||||
hard_row["pov_character_labels"] = pov_character_labels
|
||||
|
||||
Reference in New Issue
Block a user