Validate category pool references

This commit is contained in:
2026-06-26 15:00:19 +02:00
parent 5c5120a1f9
commit 1a98fdb9f2
3 changed files with 172 additions and 8 deletions
+4 -5
View File
@@ -181,8 +181,8 @@ Improve later:
- introduce optional `family` and `action_type` fields on item templates so - introduce optional `family` and `action_type` fields on item templates so
Python filters do less keyword guessing; Python filters do less keyword guessing;
- add `formatter_hint` fields only where needed, not globally; - add `formatter_hint` fields only where needed, not globally;
- add a JSON audit that checks every referenced expression/composition/scene pool - keep `tools/prompt_map_audit.py` passing; it now checks referenced
exists. expression/composition/scene pools and item-template axes.
### Node / UI Path ### Node / UI Path
@@ -305,7 +305,6 @@ Medium-term:
1. Expand `tools/prompt_smoke.py` with close foreplay, POV penetration, and 1. Expand `tools/prompt_smoke.py` with close foreplay, POV penetration, and
location-theme fixtures. location-theme fixtures.
2. Split Krea action/POV/clothing helpers into separate modules. 2. Split Krea action/POV/clothing helpers into separate modules.
3. Add category JSON pool reference validation to `tools/prompt_map_audit.py`. 3. Extract scene-camera adapters from `prompt_builder.py`.
4. Extract scene-camera adapters from `prompt_builder.py`. 4. Split `__init__.py` node classes by family after behavior is covered by smoke
5. Split `__init__.py` node classes by family after behavior is covered by smoke
checks. checks.
+6 -1
View File
@@ -651,9 +651,14 @@ The script does not import ComfyUI. It parses the repo and prints:
- registered display node names and known return names; - registered display node names and known return names;
- per-JSON category counts; - per-JSON category counts;
- named scene/expression/composition pool inventory. - named scene/expression/composition pool inventory.
- JSON reference validation for every `scene_pools`, `expression_pools`, and
`composition_pools` reference;
- item template validation so `{placeholder}` names resolve to `item_axes`.
Use its output to spot doc drift after adding a new node or pool. If a new node Use its output to spot doc drift after adding a new node or pool. If a new node
or pool appears there but not in this map, update the relevant route table. or pool appears there but not in this map, update the relevant route table. The
script exits nonzero when JSON pool references or item template axes do not
resolve.
## Behavioral Smoke Helper ## Behavioral Smoke Helper
+162 -2
View File
@@ -9,11 +9,22 @@ from __future__ import annotations
import ast import ast
import json import json
import re
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
ROOT = Path(__file__).resolve().parents[1] ROOT = Path(__file__).resolve().parents[1]
POOL_DEFINITION_KEYS = ("scene_pools", "expression_pools", "composition_pools")
POOL_REFERENCE_KEYS = {
"scene_pool": "scene_pools",
"scene_pools": "scene_pools",
"expression_pool": "expression_pools",
"expression_pools": "expression_pools",
"composition_pool": "composition_pools",
"composition_pools": "composition_pools",
}
TEMPLATE_TOKEN_RE = re.compile(r"{([a-zA-Z_][a-zA-Z0-9_]*)}")
def _literal_or_none(node: ast.AST) -> Any: def _literal_or_none(node: ast.AST) -> Any:
@@ -82,6 +93,147 @@ def _pool_names(path: Path, key: str) -> list[str]:
return sorted(pools) if isinstance(pools, dict) else [] return sorted(pools) if isinstance(pools, dict) else []
def _category_json_paths() -> list[Path]:
return sorted((ROOT / "categories").glob("*.json"))
def _load_category_json(path: Path) -> dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
def _all_pool_names(paths: list[Path]) -> dict[str, set[str]]:
names = {key: set() for key in POOL_DEFINITION_KEYS}
for path in paths:
data = _load_category_json(path)
for key in POOL_DEFINITION_KEYS:
pools = data.get(key)
if isinstance(pools, dict):
names[key].update(str(name) for name in pools if str(name).strip())
return names
def _pool_reference_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value] if value.strip() else []
if isinstance(value, list):
return [str(item) for item in value if str(item).strip()]
return []
def _path_child(path: str, key: str, value: Any) -> str:
label = key
if isinstance(value, dict):
name = str(value.get("name") or value.get("slug") or "").strip()
if name:
label = f"{key}({name})"
return f"{path}.{label}" if path else label
def _path_index(path: str, index: int, value: Any) -> str:
label = f"[{index}]"
if isinstance(value, dict):
name = str(value.get("name") or value.get("slug") or "").strip()
if name:
label = f"[{index}:{name}]"
return f"{path}{label}"
def _template_axis_errors(path: str, node: dict[str, Any]) -> list[tuple[str, str]]:
templates = node.get("item_templates")
if not isinstance(templates, list):
return []
axes = node.get("item_axes")
axis_names = set(axes) if isinstance(axes, dict) else set()
errors: list[tuple[str, str]] = []
for index, template in enumerate(templates):
if not isinstance(template, str):
errors.append((f"{path}.item_templates[{index}]", "template is not a string"))
continue
tokens = set(TEMPLATE_TOKEN_RE.findall(template))
missing = sorted(token for token in tokens if token not in axis_names)
if missing:
errors.append(
(
f"{path}.item_templates[{index}]",
"missing item_axes for placeholders: " + ", ".join(missing),
)
)
if isinstance(axes, dict):
for axis_name, values in axes.items():
if not isinstance(values, list) or not values:
errors.append((f"{path}.item_axes.{axis_name}", "axis must be a non-empty list"))
return errors
def _walk_json_references(
value: Any,
*,
file_name: str,
path: str,
defined_pools: dict[str, set[str]],
errors: list[tuple[str, str, str]],
at_root: bool = False,
) -> None:
if isinstance(value, dict):
errors.extend((file_name, item_path, issue) for item_path, issue in _template_axis_errors(path, value))
for key, child in value.items():
if at_root and key in POOL_DEFINITION_KEYS and isinstance(child, dict):
for pool_name, pool_values in child.items():
if not isinstance(pool_values, list) or not pool_values:
errors.append((file_name, f"{key}.{pool_name}", "pool must be a non-empty list"))
continue
pool_type = POOL_REFERENCE_KEYS.get(key)
if pool_type:
refs = _pool_reference_values(child)
if child and not refs:
errors.append((file_name, _path_child(path, key, child), "pool reference must be a string or list"))
for ref in refs:
if ref not in defined_pools[pool_type]:
errors.append(
(
file_name,
_path_child(path, key, child),
f"unknown {pool_type[:-1]} reference: {ref}",
)
)
_walk_json_references(
child,
file_name=file_name,
path=_path_child(path, key, child),
defined_pools=defined_pools,
errors=errors,
)
elif isinstance(value, list):
for index, child in enumerate(value):
_walk_json_references(
child,
file_name=file_name,
path=_path_index(path, index, child),
defined_pools=defined_pools,
errors=errors,
)
def _json_reference_errors(paths: list[Path]) -> list[tuple[str, str, str]]:
defined_pools = _all_pool_names(paths)
errors: list[tuple[str, str, str]] = []
for pool_type, names in defined_pools.items():
if not names:
errors.append(("(all)", pool_type, "no pools defined"))
for path in paths:
data = _load_category_json(path)
_walk_json_references(
data,
file_name=path.name,
path="$",
defined_pools=defined_pools,
errors=errors,
at_root=True,
)
return errors
def print_table(headers: tuple[str, ...], rows: list[tuple[Any, ...]]) -> None: def print_table(headers: tuple[str, ...], rows: list[tuple[Any, ...]]) -> None:
widths = [len(header) for header in headers] widths = [len(header) for header in headers]
for row in rows: for row in rows:
@@ -96,6 +248,7 @@ def print_table(headers: tuple[str, ...], rows: list[tuple[Any, ...]]) -> None:
def main() -> int: def main() -> int:
init_path = ROOT / "__init__.py" init_path = ROOT / "__init__.py"
loop_path = ROOT / "loop_nodes.py" loop_path = ROOT / "loop_nodes.py"
category_paths = _category_json_paths()
display = _assignment_dict(init_path, "NODE_DISPLAY_NAME_MAPPINGS") display = _assignment_dict(init_path, "NODE_DISPLAY_NAME_MAPPINGS")
loop_display = _assignment_dict(loop_path, "LOOP_NODE_DISPLAY_NAME_MAPPINGS") loop_display = _assignment_dict(loop_path, "LOOP_NODE_DISPLAY_NAME_MAPPINGS")
display.update(loop_display) display.update(loop_display)
@@ -111,7 +264,7 @@ def main() -> int:
print("\n# Category JSON Summary") print("\n# Category JSON Summary")
category_rows = [] category_rows = []
for path in sorted((ROOT / "categories").glob("*.json")): for path in category_paths:
summary = _category_summary(path) summary = _category_summary(path)
category_rows.append( category_rows.append(
( (
@@ -141,12 +294,19 @@ def main() -> int:
print("\n# Named Pool Inventory") print("\n# Named Pool Inventory")
pool_rows = [] pool_rows = []
for path in sorted((ROOT / "categories").glob("*.json")): for path in category_paths:
for key in ("scene_pools", "expression_pools", "composition_pools"): for key in ("scene_pools", "expression_pools", "composition_pools"):
names = _pool_names(path, key) names = _pool_names(path, key)
if names: if names:
pool_rows.append((path.name, key, len(names), ", ".join(names[:8]) + (" ..." if len(names) > 8 else ""))) pool_rows.append((path.name, key, len(names), ", ".join(names[:8]) + (" ..." if len(names) > 8 else "")))
print_table(("File", "Pool type", "Count", "First names"), pool_rows) print_table(("File", "Pool type", "Count", "First names"), pool_rows)
print("\n# JSON Reference Validation")
reference_errors = _json_reference_errors(category_paths)
if reference_errors:
print_table(("File", "Path", "Issue"), reference_errors)
return 1
print("OK: all JSON pool references and item template axes resolve.")
return 0 return 0