diff --git a/docs/prompt-architecture-improvement-plan.md b/docs/prompt-architecture-improvement-plan.md index 73e34d5..c0dc50c 100644 --- a/docs/prompt-architecture-improvement-plan.md +++ b/docs/prompt-architecture-improvement-plan.md @@ -181,8 +181,8 @@ Improve later: - introduce optional `family` and `action_type` fields on item templates so Python filters do less keyword guessing; - add `formatter_hint` fields only where needed, not globally; -- add a JSON audit that checks every referenced expression/composition/scene pool - exists. +- keep `tools/prompt_map_audit.py` passing; it now checks referenced + expression/composition/scene pools and item-template axes. ### Node / UI Path @@ -305,7 +305,6 @@ Medium-term: 1. Expand `tools/prompt_smoke.py` with close foreplay, POV penetration, and location-theme fixtures. 2. Split Krea action/POV/clothing helpers into separate modules. -3. Add category JSON pool reference validation to `tools/prompt_map_audit.py`. -4. Extract scene-camera adapters from `prompt_builder.py`. -5. Split `__init__.py` node classes by family after behavior is covered by smoke +3. Extract scene-camera adapters from `prompt_builder.py`. +4. Split `__init__.py` node classes by family after behavior is covered by smoke checks. diff --git a/docs/prompt-pool-routing-map.md b/docs/prompt-pool-routing-map.md index 8e2c735..65106a9 100644 --- a/docs/prompt-pool-routing-map.md +++ b/docs/prompt-pool-routing-map.md @@ -651,9 +651,14 @@ The script does not import ComfyUI. It parses the repo and prints: - registered display node names and known return names; - per-JSON category counts; - named scene/expression/composition pool inventory. +- JSON reference validation for every `scene_pools`, `expression_pools`, and + `composition_pools` reference; +- item template validation so `{placeholder}` names resolve to `item_axes`. Use its output to spot doc drift after adding a new node or pool. If a new node -or pool appears there but not in this map, update the relevant route table. +or pool appears there but not in this map, update the relevant route table. The +script exits nonzero when JSON pool references or item template axes do not +resolve. ## Behavioral Smoke Helper diff --git a/tools/prompt_map_audit.py b/tools/prompt_map_audit.py index 3bc0f52..74ba989 100644 --- a/tools/prompt_map_audit.py +++ b/tools/prompt_map_audit.py @@ -9,11 +9,22 @@ from __future__ import annotations import ast import json +import re from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] +POOL_DEFINITION_KEYS = ("scene_pools", "expression_pools", "composition_pools") +POOL_REFERENCE_KEYS = { + "scene_pool": "scene_pools", + "scene_pools": "scene_pools", + "expression_pool": "expression_pools", + "expression_pools": "expression_pools", + "composition_pool": "composition_pools", + "composition_pools": "composition_pools", +} +TEMPLATE_TOKEN_RE = re.compile(r"{([a-zA-Z_][a-zA-Z0-9_]*)}") def _literal_or_none(node: ast.AST) -> Any: @@ -82,6 +93,147 @@ def _pool_names(path: Path, key: str) -> list[str]: return sorted(pools) if isinstance(pools, dict) else [] +def _category_json_paths() -> list[Path]: + return sorted((ROOT / "categories").glob("*.json")) + + +def _load_category_json(path: Path) -> dict[str, Any]: + data = json.loads(path.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else {} + + +def _all_pool_names(paths: list[Path]) -> dict[str, set[str]]: + names = {key: set() for key in POOL_DEFINITION_KEYS} + for path in paths: + data = _load_category_json(path) + for key in POOL_DEFINITION_KEYS: + pools = data.get(key) + if isinstance(pools, dict): + names[key].update(str(name) for name in pools if str(name).strip()) + return names + + +def _pool_reference_values(value: Any) -> list[str]: + if isinstance(value, str): + return [value] if value.strip() else [] + if isinstance(value, list): + return [str(item) for item in value if str(item).strip()] + return [] + + +def _path_child(path: str, key: str, value: Any) -> str: + label = key + if isinstance(value, dict): + name = str(value.get("name") or value.get("slug") or "").strip() + if name: + label = f"{key}({name})" + return f"{path}.{label}" if path else label + + +def _path_index(path: str, index: int, value: Any) -> str: + label = f"[{index}]" + if isinstance(value, dict): + name = str(value.get("name") or value.get("slug") or "").strip() + if name: + label = f"[{index}:{name}]" + return f"{path}{label}" + + +def _template_axis_errors(path: str, node: dict[str, Any]) -> list[tuple[str, str]]: + templates = node.get("item_templates") + if not isinstance(templates, list): + return [] + axes = node.get("item_axes") + axis_names = set(axes) if isinstance(axes, dict) else set() + errors: list[tuple[str, str]] = [] + for index, template in enumerate(templates): + if not isinstance(template, str): + errors.append((f"{path}.item_templates[{index}]", "template is not a string")) + continue + tokens = set(TEMPLATE_TOKEN_RE.findall(template)) + missing = sorted(token for token in tokens if token not in axis_names) + if missing: + errors.append( + ( + f"{path}.item_templates[{index}]", + "missing item_axes for placeholders: " + ", ".join(missing), + ) + ) + if isinstance(axes, dict): + for axis_name, values in axes.items(): + if not isinstance(values, list) or not values: + errors.append((f"{path}.item_axes.{axis_name}", "axis must be a non-empty list")) + return errors + + +def _walk_json_references( + value: Any, + *, + file_name: str, + path: str, + defined_pools: dict[str, set[str]], + errors: list[tuple[str, str, str]], + at_root: bool = False, +) -> None: + if isinstance(value, dict): + errors.extend((file_name, item_path, issue) for item_path, issue in _template_axis_errors(path, value)) + for key, child in value.items(): + if at_root and key in POOL_DEFINITION_KEYS and isinstance(child, dict): + for pool_name, pool_values in child.items(): + if not isinstance(pool_values, list) or not pool_values: + errors.append((file_name, f"{key}.{pool_name}", "pool must be a non-empty list")) + continue + pool_type = POOL_REFERENCE_KEYS.get(key) + if pool_type: + refs = _pool_reference_values(child) + if child and not refs: + errors.append((file_name, _path_child(path, key, child), "pool reference must be a string or list")) + for ref in refs: + if ref not in defined_pools[pool_type]: + errors.append( + ( + file_name, + _path_child(path, key, child), + f"unknown {pool_type[:-1]} reference: {ref}", + ) + ) + _walk_json_references( + child, + file_name=file_name, + path=_path_child(path, key, child), + defined_pools=defined_pools, + errors=errors, + ) + elif isinstance(value, list): + for index, child in enumerate(value): + _walk_json_references( + child, + file_name=file_name, + path=_path_index(path, index, child), + defined_pools=defined_pools, + errors=errors, + ) + + +def _json_reference_errors(paths: list[Path]) -> list[tuple[str, str, str]]: + defined_pools = _all_pool_names(paths) + errors: list[tuple[str, str, str]] = [] + for pool_type, names in defined_pools.items(): + if not names: + errors.append(("(all)", pool_type, "no pools defined")) + for path in paths: + data = _load_category_json(path) + _walk_json_references( + data, + file_name=path.name, + path="$", + defined_pools=defined_pools, + errors=errors, + at_root=True, + ) + return errors + + def print_table(headers: tuple[str, ...], rows: list[tuple[Any, ...]]) -> None: widths = [len(header) for header in headers] for row in rows: @@ -96,6 +248,7 @@ def print_table(headers: tuple[str, ...], rows: list[tuple[Any, ...]]) -> None: def main() -> int: init_path = ROOT / "__init__.py" loop_path = ROOT / "loop_nodes.py" + category_paths = _category_json_paths() display = _assignment_dict(init_path, "NODE_DISPLAY_NAME_MAPPINGS") loop_display = _assignment_dict(loop_path, "LOOP_NODE_DISPLAY_NAME_MAPPINGS") display.update(loop_display) @@ -111,7 +264,7 @@ def main() -> int: print("\n# Category JSON Summary") category_rows = [] - for path in sorted((ROOT / "categories").glob("*.json")): + for path in category_paths: summary = _category_summary(path) category_rows.append( ( @@ -141,12 +294,19 @@ def main() -> int: print("\n# Named Pool Inventory") pool_rows = [] - for path in sorted((ROOT / "categories").glob("*.json")): + for path in category_paths: for key in ("scene_pools", "expression_pools", "composition_pools"): names = _pool_names(path, key) if names: pool_rows.append((path.name, key, len(names), ", ".join(names[:8]) + (" ..." if len(names) > 8 else ""))) print_table(("File", "Pool type", "Count", "First names"), pool_rows) + + print("\n# JSON Reference Validation") + reference_errors = _json_reference_errors(category_paths) + if reference_errors: + print_table(("File", "Path", "Issue"), reference_errors) + return 1 + print("OK: all JSON pool references and item template axes resolve.") return 0