From 7f808be99751011f751dc6fca323020129c95432 Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Sat, 27 Jun 2026 03:09:17 +0200 Subject: [PATCH] Extract row location policy --- docs/prompt-architecture-improvement-plan.md | 5 +- docs/prompt-pool-routing-map.md | 5 +- prompt_builder.py | 102 +--------- row_location.py | 199 +++++++++++++++++++ tools/prompt_smoke.py | 47 +++++ 5 files changed, 256 insertions(+), 102 deletions(-) create mode 100644 row_location.py diff --git a/docs/prompt-architecture-improvement-plan.md b/docs/prompt-architecture-improvement-plan.md index 1c07ede..c1a3b80 100644 --- a/docs/prompt-architecture-improvement-plan.md +++ b/docs/prompt-architecture-improvement-plan.md @@ -146,8 +146,9 @@ Already isolated: `prompt_builder.py` keeps public delegate wrappers. - location/composition config presets, themed location packs, custom location/composition entry parsing, merge behavior, and config parsing live - in `location_config.py`; `prompt_builder.py` still applies selected configs - to rows. + in `location_config.py`; built-in row location/composition config + application, source metadata, and prompt/caption rewrites live in + `row_location.py`. - hardcore position/action-filter choices, selected-position normalization, config JSON builders/parsers, focus-policy toggles, subcategory allow-list policy, position-key detection, category filtering, and item-template/axis diff --git a/docs/prompt-pool-routing-map.md b/docs/prompt-pool-routing-map.md index 6bc035e..84d0cec 100644 --- a/docs/prompt-pool-routing-map.md +++ b/docs/prompt-pool-routing-map.md @@ -77,6 +77,7 @@ Core helper ownership: | `generation_profile_config.py` | Generation profile presets, profile option overrides, trigger policy, expression/pose/clothing config normalization, and profile config parsing. | | `seed_config.py` | Seed axis salts/aliases, seed mode choices, global/axis lock JSON builders, seed config parsing, row seed math, and deterministic axis RNG construction. | | `location_config.py` | Location/composition preset schemas, themed location packs, custom location/composition parsing, pool merge behavior, and location/composition config parsing. | +| `row_location.py` | Built-in row location/composition config application, deterministic scene/composition choice, source metadata, and legacy prompt/caption rewrites. | | `hardcore_position_config.py` | Hardcore position/action-filter choices, selected-position normalization, config JSON builders/parsers, focus-policy toggles, subcategory allow-list policy, position-key detection, and category/template/axis filtering. | | `pair_options.py` | Insta/OF option schema/defaults, softcore category/outfit/pose pools, partner outfit pools, clothing-continuity labels, negatives, and hardcore cast count policy. | | `pair_rows.py` | Insta/OF soft/hard row creation, softcore expression override resolution, Woman A slot context application, soft outfit/pose overrides, and POV row fields. | @@ -132,8 +133,8 @@ These recipes identify the intended road before editing prompt text. | Same woman, same room, softcore and hardcore outputs | `Character Slot/Profile` -> `Insta/OF Options` -> `Insta/OF Prompt Pair` | `continuity=same_creator_same_room`; set `softcore_cast` as needed; use pair metadata into formatter | `build_insta_of_pair`, `softcore_row`, `hardcore_row`, pair metadata fields | | Same cast in softcore and hardcore | Character slot chain -> `Insta/OF Options` | `softcore_cast=same_as_hardcore`; configure partner slots/outfits if needed | `_insta_of_partner_styling`, character slot clothing, pair Krea branch | | Change only outfit/clothing | Character clothing or category content route | Keep `person_seed`, `scene_seed`, `pose_seed`; change `content_seed`; slot `softcore_outfit` overrides Insta/OF outfit | `SxCP Character Clothing`, `pair_options.py`, category item templates | -| Force a custom location | `SxCP Location Pool` or `SxCP Location Theme` -> builder/pair | `combine_mode=replace` to force; `add` to mix with category scenes | `_scene_pool`, `_apply_location_config_to_legacy_row`, camera scene adapter | -| Force a custom frame/composition | `SxCP Composition Pool` or `SxCP Location Theme` -> builder/pair | `combine_mode=replace` to force; `add` to mix | `_composition_pool`, `_apply_composition_config_to_legacy_row`, Krea composition phrase | +| Force a custom location | `SxCP Location Pool` or `SxCP Location Theme` -> builder/pair | `combine_mode=replace` to force; `add` to mix with category scenes | `_scene_pool`, `row_location.apply_location_config_to_legacy_row`, camera scene adapter | +| Force a custom frame/composition | `SxCP Composition Pool` or `SxCP Location Theme` -> builder/pair | `combine_mode=replace` to force; `add` to mix | `_composition_pool`, `row_location.apply_composition_config_to_legacy_row`, Krea composition phrase | | Use Qwen/orbit camera geometry | Qwen/orbit node -> camera_config -> builder/pair | For pair, use `softcore_camera_config` and/or `hardcore_camera_config`; set mode from config in options | `_camera_config_with_mode`, `_camera_directive`, `_camera_scene_directive_for_context` | | Use Krea2 for only hard prompt from a pair | Pair `metadata_json` -> Krea2 Formatter | `target=hardcore`, `input_hint=metadata_json` or auto with metadata connected | `_insta_pair_to_krea`, hard row fields | | Convert builder output to SDXL tags | Builder/pair metadata -> SDXL Formatter | Use metadata input; set `target`; select style and quality preset | `_row_core_tags`, `_soft_tags`, `_hard_tags` | diff --git a/prompt_builder.py b/prompt_builder.py index 7870789..87ade41 100644 --- a/prompt_builder.py +++ b/prompt_builder.py @@ -41,6 +41,7 @@ try: from . import pair_options from . import row_normalization as row_policy from . import row_camera as row_camera_policy + from . import row_location as row_location_policy from . import seed_config as seed_policy from .hardcore_text_cleanup import ( sanitize_hardcore_axis_values as _sanitize_hardcore_axis_values, @@ -82,6 +83,7 @@ except ImportError: # Allows local smoke tests with `python -c`. import pair_options import row_normalization as row_policy import row_camera as row_camera_policy + import row_location as row_location_policy import seed_config as seed_policy from hardcore_text_cleanup import ( sanitize_hardcore_axis_values as _sanitize_hardcore_axis_values, @@ -3122,102 +3124,6 @@ def _scene_pool( return scene_entries or fallback -def _legacy_scene_entries_for_row(row: dict[str, Any]) -> list[Any]: - subject = str(row.get("primary_subject") or "").lower() - if "group" in subject or "layout" in subject: - return list(g.GROUP_SCENES) - return list(g.SCENES) - - -def _legacy_scene_text_for_slug(slug: str) -> str: - for entry in list(g.SCENES) + list(g.GROUP_SCENES): - entry_slug, entry_text = _pair_from(entry) - if entry_slug == slug: - return entry_text - return "" - - -def _apply_location_config_to_legacy_row( - row: dict[str, Any], - location_config: dict[str, Any], - seed_config: dict[str, int], - seed: int, - row_number: int, -) -> dict[str, Any]: - if not _location_config_active(location_config): - return row - location_entries = _list_from(location_config.get("scene_entries")) - if location_config.get("apply_mode") == "add": - choices = _legacy_scene_entries_for_row(row) - _unique_extend(choices, location_entries) - else: - choices = location_entries - scene_rng = _axis_rng(seed_config, "scene", seed, row_number) - scene_slug, scene_text = _choose_pair(scene_rng, choices) - old_slug = str(row.get("scene") or "") - old_text = _legacy_scene_text_for_slug(old_slug) - row["source_scene"] = old_slug - row["source_scene_text"] = old_text - row["scene"] = scene_slug - row["scene_text"] = scene_text - row["location_config"] = location_config - if old_text: - row["prompt"] = str(row.get("prompt") or "").replace(f"Scene: {old_text}.", f"Scene: {scene_text}.") - row["caption"] = str(row.get("caption") or "").replace(f", {old_text},", f", {scene_text},") - else: - row["prompt"] = re.sub( - r"Scene:\s*.*?\.\s*Pose:", - f"Scene: {scene_text}. Pose:", - str(row.get("prompt") or ""), - count=1, - ) - return row - - -def _legacy_composition_entries_for_row(row: dict[str, Any]) -> list[Any]: - subject = str(row.get("primary_subject") or "").lower() - if "group" in subject or "layout" in subject: - return list(g.GROUP_COMPOSITIONS) - return list(g.COMPOSITIONS) - - -def _apply_composition_config_to_legacy_row( - row: dict[str, Any], - composition_config: dict[str, Any], - seed_config: dict[str, int], - seed: int, - row_number: int, -) -> dict[str, Any]: - if not _composition_config_active(composition_config): - return row - composition_entries = _list_from(composition_config.get("composition_entries")) - if composition_config.get("apply_mode") == "add": - choices = _legacy_composition_entries_for_row(row) - _unique_extend(choices, composition_entries) - else: - choices = composition_entries - composition_rng = _axis_rng(seed_config, "composition", seed, row_number) - new_composition = _choose_text(composition_rng, choices) - old_composition = str(row.get("composition") or "") - old_prompt_fragment = f"Composition: vertical {old_composition}." - new_prompt_fragment = f"Composition: {_composition_prompt(new_composition)}." - row["source_composition"] = old_composition - row["composition"] = new_composition - row["composition_prompt"] = _composition_prompt(new_composition) - row["composition_config"] = composition_config - if old_composition: - row["prompt"] = str(row.get("prompt") or "").replace(old_prompt_fragment, new_prompt_fragment) - row["caption"] = str(row.get("caption") or "").replace(f", {old_composition},", f", {new_composition},") - else: - row["prompt"] = re.sub( - r"Composition:\s*.*?\.\s*Use", - f"{new_prompt_fragment} Use", - str(row.get("prompt") or ""), - count=1, - ) - return row - - def _expression_pool(category: dict[str, Any], subcategory: dict[str, Any], item: Any) -> list[Any]: return _configured_pool( category, @@ -3919,14 +3825,14 @@ def build_prompt( ) if row.get("source") == "built_in_generator": - row = _apply_location_config_to_legacy_row( + row = row_location_policy.apply_location_config_to_legacy_row( row, parsed_location_config, parsed_seed_config, seed, row_number, ) - row = _apply_composition_config_to_legacy_row( + row = row_location_policy.apply_composition_config_to_legacy_row( row, parsed_composition_config, parsed_seed_config, diff --git a/row_location.py b/row_location.py new file mode 100644 index 0000000..3a4e9a5 --- /dev/null +++ b/row_location.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +import json +import random +import re +from typing import Any + +try: + from . import generate_prompt_batches as g + from . import location_config as location_policy + from . import row_camera + from . import seed_config as seed_policy +except ImportError: # Allows local smoke tests with top-level imports. + import generate_prompt_batches as g + import location_config as location_policy + import row_camera + import seed_config as seed_policy + + +def _list_from(value: Any) -> list[Any]: + if value is None: + return [] + if isinstance(value, list): + return value + return [value] + + +def _unique_extend(target: list[Any], additions: list[Any]) -> None: + seen = set() + for item in target: + try: + seen.add(json.dumps(item, sort_keys=True)) + except TypeError: + seen.add(repr(item)) + for item in additions: + try: + marker = json.dumps(item, sort_keys=True) + except TypeError: + marker = repr(item) + if marker not in seen: + target.append(item) + seen.add(marker) + + +def _pair_from(value: Any) -> tuple[str, str]: + if isinstance(value, dict): + text = str( + value.get("prompt") + or value.get("description") + or value.get("text") + or value.get("name") + or "" + ).strip() + slug = str(value.get("slug") or g.slugify(str(value.get("name") or text)) or "custom").strip() + if not text: + raise ValueError(f"Pair extension is missing prompt text: {value!r}") + return slug, text + if isinstance(value, (list, tuple)) and len(value) == 2: + return str(value[0]), str(value[1]) + text = str(value).strip() + if not text: + raise ValueError("Pair extension cannot be empty") + return g.slugify(text) or "custom", text + + +def _weighted_choice(rng: random.Random, items: list[Any]) -> Any: + if not items: + raise ValueError("Cannot choose from an empty list") + weights: list[float] = [] + for item in items: + weight = item.get("weight", 1.0) if isinstance(item, dict) else 1.0 + try: + weights.append(max(0.0, float(weight))) + except (TypeError, ValueError): + weights.append(1.0) + total = sum(weights) + if total <= 0: + return items[rng.randrange(len(items))] + pick = rng.random() * total + running = 0.0 + for item, weight in zip(items, weights): + running += weight + if pick <= running: + return item + return items[-1] + + +def _choose_pair(rng: random.Random, items: list[Any]) -> tuple[str, str]: + return _pair_from(_weighted_choice(rng, items)) + + +def _choose_text(rng: random.Random, items: list[Any]) -> str: + item = _weighted_choice(rng, items) + if isinstance(item, dict): + return str( + item.get("template") + or item.get("prompt") + or item.get("text") + or item.get("description") + or item.get("name") + or "" + ).strip() + return str(item).strip() + + +def legacy_scene_entries_for_row(row: dict[str, Any]) -> list[Any]: + subject = str(row.get("primary_subject") or "").lower() + if "group" in subject or "layout" in subject: + return list(g.GROUP_SCENES) + return list(g.SCENES) + + +def legacy_scene_text_for_slug(slug: str) -> str: + for entry in list(g.SCENES) + list(g.GROUP_SCENES): + entry_slug, entry_text = _pair_from(entry) + if entry_slug == slug: + return entry_text + return "" + + +def apply_location_config_to_legacy_row( + row: dict[str, Any], + location_config: dict[str, Any], + seed_config: dict[str, int], + seed: int, + row_number: int, +) -> dict[str, Any]: + if not location_policy.location_config_active(location_config): + return row + location_entries = _list_from(location_config.get("scene_entries")) + if location_config.get("apply_mode") == "add": + choices = legacy_scene_entries_for_row(row) + _unique_extend(choices, location_entries) + else: + choices = location_entries + scene_rng = seed_policy.axis_rng(seed_config, "scene", seed, row_number) + scene_slug, scene_text = _choose_pair(scene_rng, choices) + old_slug = str(row.get("scene") or "") + old_text = legacy_scene_text_for_slug(old_slug) + row["source_scene"] = old_slug + row["source_scene_text"] = old_text + row["scene"] = scene_slug + row["scene_text"] = scene_text + row["location_config"] = location_config + if old_text: + row["prompt"] = str(row.get("prompt") or "").replace(f"Scene: {old_text}.", f"Scene: {scene_text}.") + row["caption"] = str(row.get("caption") or "").replace(f", {old_text},", f", {scene_text},") + else: + row["prompt"] = re.sub( + r"Scene:\s*.*?\.\s*Pose:", + f"Scene: {scene_text}. Pose:", + str(row.get("prompt") or ""), + count=1, + ) + return row + + +def legacy_composition_entries_for_row(row: dict[str, Any]) -> list[Any]: + subject = str(row.get("primary_subject") or "").lower() + if "group" in subject or "layout" in subject: + return list(g.GROUP_COMPOSITIONS) + return list(g.COMPOSITIONS) + + +def apply_composition_config_to_legacy_row( + row: dict[str, Any], + composition_config: dict[str, Any], + seed_config: dict[str, int], + seed: int, + row_number: int, +) -> dict[str, Any]: + if not location_policy.composition_config_active(composition_config): + return row + composition_entries = _list_from(composition_config.get("composition_entries")) + if composition_config.get("apply_mode") == "add": + choices = legacy_composition_entries_for_row(row) + _unique_extend(choices, composition_entries) + else: + choices = composition_entries + composition_rng = seed_policy.axis_rng(seed_config, "composition", seed, row_number) + new_composition = _choose_text(composition_rng, choices) + old_composition = str(row.get("composition") or "") + old_prompt_fragment = f"Composition: vertical {old_composition}." + new_prompt_fragment = f"Composition: {row_camera.composition_prompt(new_composition)}." + row["source_composition"] = old_composition + row["composition"] = new_composition + row["composition_prompt"] = row_camera.composition_prompt(new_composition) + row["composition_config"] = composition_config + if old_composition: + row["prompt"] = str(row.get("prompt") or "").replace(old_prompt_fragment, new_prompt_fragment) + row["caption"] = str(row.get("caption") or "").replace(f", {old_composition},", f", {new_composition},") + else: + row["prompt"] = re.sub( + r"Composition:\s*.*?\.\s*Use", + f"{new_prompt_fragment} Use", + str(row.get("prompt") or ""), + count=1, + ) + return row diff --git a/tools/prompt_smoke.py b/tools/prompt_smoke.py index 07f2c00..9304a29 100644 --- a/tools/prompt_smoke.py +++ b/tools/prompt_smoke.py @@ -45,6 +45,7 @@ import prompt_builder as pb # noqa: E402 import row_normalization # noqa: E402 import route_metadata # noqa: E402 import row_camera # noqa: E402 +import row_location # noqa: E402 import server_routes # noqa: E402 import sdxl_formatter # noqa: E402 import sdxl_presets # noqa: E402 @@ -603,6 +604,51 @@ def smoke_location_config_policy() -> None: _expect(json.loads(themed_composition).get("composition_entries"), "Themed location did not output compositions") +def smoke_row_location_policy() -> None: + location = json.loads( + location_config.build_location_pool_json( + combine_mode="replace", + custom_locations="archive_corner: hidden archive corner with repeated shelves and warm table lamps", + ) + ) + composition = json.loads( + location_config.build_composition_pool_json( + combine_mode="replace", + custom_compositions="long archive aisle composition", + ) + ) + row = { + "source": "built_in_generator", + "primary_subject": "adult woman", + "scene": "unknown_old_scene", + "composition": "old frame", + "prompt": "A generated adult prompt. Scene: old room. Pose: standing. Composition: vertical old frame. Avoid: low quality.", + "caption": "sxcppnl7, generated adult prompt, old room, old frame, illustration", + } + updated = row_location.apply_location_config_to_legacy_row(dict(row), location, {}, 123, 1) + updated = row_location.apply_composition_config_to_legacy_row(updated, composition, {}, 123, 1) + _expect(updated.get("scene") == "archive_corner", "Row location policy did not select forced custom scene slug") + _expect( + updated.get("scene_text") == "hidden archive corner with repeated shelves and warm table lamps", + "Row location policy did not apply forced custom scene text", + ) + _expect(updated.get("source_scene") == "unknown_old_scene", "Row location policy lost source scene slug") + _expect( + "Scene: hidden archive corner with repeated shelves and warm table lamps. Pose:" in updated.get("prompt", ""), + "Row location policy did not rewrite prompt scene", + ) + _expect(updated.get("composition") == "long archive aisle composition", "Row location policy did not apply forced composition") + _expect( + updated.get("composition_prompt") == "vertical long archive aisle composition", + "Row location policy did not compute composition prompt", + ) + _expect( + "Composition: vertical long archive aisle composition." in updated.get("prompt", ""), + "Row location policy did not rewrite prompt composition", + ) + _expect(", long archive aisle composition," in updated.get("caption", ""), "Row location policy did not rewrite caption composition") + + def smoke_category_cast_config_policy() -> None: _expect(pb.CATEGORY_PRESETS is category_cast_config.CATEGORY_PRESETS, "Prompt builder category presets are not delegated") _expect(pb.CAST_PRESETS is category_cast_config.CAST_PRESETS, "Prompt builder cast presets are not delegated") @@ -3475,6 +3521,7 @@ SMOKE_CASES: list[tuple[str, Callable[[], None]]] = [ ("row_camera_policy", smoke_row_camera_policy), ("config_route_location_theme", smoke_config_route_location_theme), ("location_config_policy", smoke_location_config_policy), + ("row_location_policy", smoke_row_location_policy), ("category_cast_config_policy", smoke_category_cast_config_policy), ("generation_profile_config_policy", smoke_generation_profile_config_policy), ("filter_config_policy", smoke_filter_config_policy),