Files
ComfyUI-Ethanfel-Prompt-Bui…/tools/prompt_map_audit.py
T

911 lines
39 KiB
Python

#!/usr/bin/env python3
"""Print a lightweight audit for the prompt routing map.
This intentionally avoids importing the ComfyUI node package. Static checks
parse Python and JSON files directly, while runtime checks import only the pure
generator/formatter modules so the audit can run in a plain shell without
ComfyUI loaded.
"""
from __future__ import annotations
import ast
import json
import re
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
import category_template_metadata as template_metadata_policy # noqa: E402
import category_library as category_policy # noqa: E402
import caption_naturalizer # noqa: E402
import krea_formatter # noqa: E402
import prompt_builder as pb # noqa: E402
import sdxl_formatter # noqa: E402
POOL_DEFINITION_KEYS = ("scene_pools", "expression_pools", "composition_pools")
POOL_REFERENCE_KEYS = {
"scene_pool": "scene_pools",
"scene_pools": "scene_pools",
"expression_pool": "expression_pools",
"expression_pools": "expression_pools",
"composition_pool": "composition_pools",
"composition_pools": "composition_pools",
}
TEMPLATE_TOKEN_RE = re.compile(r"{([a-zA-Z_][a-zA-Z0-9_]*)}")
CRITICAL_ROUTE_MODULES: tuple[tuple[str, str], ...] = (
("builder_prompt_route.py", "builder_prompt_route_policy"),
("builder_config_route.py", "builder_config_route_policy"),
("krea_format_route.py", "krea_format_route_policy"),
("sdxl_format_route.py", "sdxl_format_route_policy"),
("caption_format_route.py", "caption_format_route_policy"),
("prompt_hygiene.py", "prompt_hygiene_policy"),
("row_normalization.py", "row_normalization_policy"),
("seed_config.py", "seed_config_policy"),
("formatter_detail.py", "formatter_detail_policy"),
("formatter_input.py", "formatter_input_policy"),
("formatter_target.py", "formatter_target_policy"),
("pair_builder.py", "pair_builder_policy"),
("row_assembly.py", "row_assembly_policy"),
("row_category_route.py", "row_category_route_policy"),
("row_prompt_axes.py", "row_prompt_axes_policy"),
("route_metadata.py", "row_route_metadata_policy"),
("row_route_metadata.py", "row_route_metadata_policy"),
("row_subject_route.py", "row_subject_route_policy"),
("caption_metadata_routes.py", "caption_metadata_routes"),
("sdxl_tag_routes.py", "sdxl_tag_routes"),
)
ENTRY_ROUTE_SNIPPETS: tuple[str, ...] = (
"`build_prompt` -> `builder_prompt_route.py`",
"`build_prompt_from_configs` -> `builder_config_route.py`",
"`format_krea2_prompt` -> `krea_format_route.py`",
"`format_sdxl_prompt` -> `sdxl_format_route.py`",
"`naturalize_caption` -> `caption_format_route.py`",
)
PROMPT_ROW_READ_SCAN_GLOBS: tuple[str, ...] = (
"krea_*.py",
"sdxl_*.py",
"caption_*.py",
"formatter_*.py",
)
ALLOWED_PROMPT_ROW_READS: set[tuple[str, str]] = {
# Central row-value fallback. Metadata routes should prefer explicit fields,
# but any remaining label fallback must pass through this shared helper.
("formatter_input.py", "row_value"),
# Last-resort caption fallback after all metadata branches decline the row.
("caption_naturalizer.py", "_metadata_to_prose"),
}
def _literal_or_none(node: ast.AST) -> Any:
try:
return ast.literal_eval(node)
except Exception:
return None
def _assignment_dict(path: Path, name: str) -> dict[str, Any]:
tree = ast.parse(path.read_text(encoding="utf-8"))
for node in tree.body:
if not isinstance(node, ast.Assign):
continue
if not any(isinstance(target, ast.Name) and target.id == name for target in node.targets):
continue
value = _literal_or_none(node.value)
return value if isinstance(value, dict) else {}
return {}
def _class_return_names(path: Path) -> dict[str, tuple[str, ...]]:
tree = ast.parse(path.read_text(encoding="utf-8"))
classes: dict[str, tuple[list[str], tuple[str, ...]]] = {}
for node in tree.body:
if not isinstance(node, ast.ClassDef):
continue
bases = [base.id for base in node.bases if isinstance(base, ast.Name)]
return_names: tuple[str, ...] = ()
for item in node.body:
if not isinstance(item, ast.Assign):
continue
if not any(isinstance(target, ast.Name) and target.id == "RETURN_NAMES" for target in item.targets):
continue
value = _literal_or_none(item.value)
if isinstance(value, tuple) and all(isinstance(part, str) for part in value):
return_names = value
classes[node.name] = (bases, return_names)
def resolve(class_name: str, seen: set[str] | None = None) -> tuple[str, ...]:
seen = seen or set()
if class_name in seen:
return ()
seen.add(class_name)
bases, return_names = classes.get(class_name, ([], ()))
if return_names:
return return_names
for base_name in bases:
inherited = resolve(base_name, seen)
if inherited:
return inherited
return ()
result: dict[str, tuple[str, ...]] = {}
for class_name in classes:
if class_name.startswith("SxCP"):
return_names = resolve(class_name)
if return_names:
result[class_name] = return_names
return result
def _category_summary(path: Path) -> dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
categories = data.get("categories") or []
subcategory_count = 0
item_template_count = 0
for category in categories:
subcategories = category.get("subcategories") or []
subcategory_count += len(subcategories)
for subcategory in subcategories:
item_template_count += len(subcategory.get("item_templates") or [])
for item in subcategory.get("items") or []:
if isinstance(item, dict):
item_template_count += len(item.get("item_templates") or [])
return {
"categories": len(categories),
"subcategories": subcategory_count,
"item_templates": item_template_count,
"scene_pools": len(data.get("scene_pools") or {}),
"expression_pools": len(data.get("expression_pools") or {}),
"composition_pools": len(data.get("composition_pools") or {}),
"pool_extensions": len(data.get("pool_extensions") or {}),
}
def _pool_names(path: Path, key: str) -> list[str]:
data = json.loads(path.read_text(encoding="utf-8"))
pools = data.get(key) or {}
return sorted(pools) if isinstance(pools, dict) else []
def _category_json_paths() -> list[Path]:
return sorted((ROOT / "categories").glob("*.json"))
def _node_python_paths() -> list[Path]:
paths = [ROOT / "__init__.py", ROOT / "loop_nodes.py"]
paths.extend(sorted(ROOT.glob("node_*.py")))
return [path for path in paths if path.exists()]
def _load_category_json(path: Path) -> dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
def _all_pool_names(paths: list[Path]) -> dict[str, set[str]]:
names = {key: set() for key in POOL_DEFINITION_KEYS}
for path in paths:
data = _load_category_json(path)
for key in POOL_DEFINITION_KEYS:
pools = data.get(key)
if isinstance(pools, dict):
names[key].update(str(name) for name in pools if str(name).strip())
return names
def _pool_reference_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value] if value.strip() else []
if isinstance(value, list):
return [str(item) for item in value if str(item).strip()]
return []
def _path_child(path: str, key: str, value: Any) -> str:
label = key
if isinstance(value, dict):
name = str(value.get("name") or value.get("slug") or "").strip()
if name:
label = f"{key}({name})"
return f"{path}.{label}" if path else label
def _path_index(path: str, index: int, value: Any) -> str:
label = f"[{index}]"
if isinstance(value, dict):
name = str(value.get("name") or value.get("slug") or "").strip()
if name:
label = f"[{index}:{name}]"
return f"{path}{label}"
def _template_axis_errors(path: str, node: dict[str, Any]) -> list[tuple[str, str]]:
templates = node.get("item_templates")
if not isinstance(templates, list):
return []
axes = node.get("item_axes")
axis_names = set(axes) if isinstance(axes, dict) else set()
errors: list[tuple[str, str]] = []
for index, template in enumerate(templates):
template_path = f"{path}.item_templates[{index}]"
if isinstance(template, dict):
template_text = str(
template.get("template")
or template.get("prompt")
or template.get("text")
or template.get("description")
or template.get("name")
or ""
).strip()
metadata = template_metadata_policy.template_metadata(template)
for issue in template_metadata_policy.template_metadata_errors(metadata):
errors.append((template_path, issue))
elif isinstance(template, str):
template_text = template
else:
template_text = ""
if not template_text:
errors.append((template_path, "template must be a string or object with template/text"))
continue
tokens = set(TEMPLATE_TOKEN_RE.findall(template_text))
missing = sorted(token for token in tokens if token not in axis_names)
if missing:
errors.append(
(
template_path,
"missing item_axes for placeholders: " + ", ".join(missing),
)
)
if isinstance(axes, dict):
for axis_name, values in axes.items():
if not isinstance(values, list) or not values:
errors.append((f"{path}.item_axes.{axis_name}", "axis must be a non-empty list"))
return errors
def _container_template_metadata_errors(path: str, node: dict[str, Any]) -> list[tuple[str, str]]:
if "item_template_metadata" not in node:
return []
metadata = node.get("item_template_metadata")
if not isinstance(metadata, dict):
return [(f"{path}.item_template_metadata", "item_template_metadata must be an object")]
normalized = template_metadata_policy.template_metadata(metadata)
return [
(f"{path}.item_template_metadata", issue)
for issue in template_metadata_policy.template_metadata_errors(normalized)
]
def _walk_json_references(
value: Any,
*,
file_name: str,
path: str,
defined_pools: dict[str, set[str]],
errors: list[tuple[str, str, str]],
at_root: bool = False,
) -> None:
if isinstance(value, dict):
errors.extend((file_name, item_path, issue) for item_path, issue in _template_axis_errors(path, value))
errors.extend(
(file_name, item_path, issue)
for item_path, issue in _container_template_metadata_errors(path, value)
)
for key, child in value.items():
if at_root and key in POOL_DEFINITION_KEYS and isinstance(child, dict):
for pool_name, pool_values in child.items():
if not isinstance(pool_values, list) or not pool_values:
errors.append((file_name, f"{key}.{pool_name}", "pool must be a non-empty list"))
continue
pool_type = POOL_REFERENCE_KEYS.get(key)
if pool_type:
refs = _pool_reference_values(child)
if child and not refs:
errors.append((file_name, _path_child(path, key, child), "pool reference must be a string or list"))
for ref in refs:
if ref not in defined_pools[pool_type]:
errors.append(
(
file_name,
_path_child(path, key, child),
f"unknown {pool_type[:-1]} reference: {ref}",
)
)
_walk_json_references(
child,
file_name=file_name,
path=_path_child(path, key, child),
defined_pools=defined_pools,
errors=errors,
)
elif isinstance(value, list):
for index, child in enumerate(value):
_walk_json_references(
child,
file_name=file_name,
path=_path_index(path, index, child),
defined_pools=defined_pools,
errors=errors,
)
def _json_reference_errors(paths: list[Path]) -> list[tuple[str, str, str]]:
defined_pools = _all_pool_names(paths)
errors: list[tuple[str, str, str]] = []
for pool_type, names in defined_pools.items():
if not names:
errors.append(("(all)", pool_type, "no pools defined"))
for path in paths:
data = _load_category_json(path)
_walk_json_references(
data,
file_name=path.name,
path="$",
defined_pools=defined_pools,
errors=errors,
at_root=True,
)
return errors
def _hardcore_template_metadata_errors(paths: list[Path]) -> list[tuple[str, str, str]]:
errors: list[tuple[str, str, str]] = []
for path in paths:
data = _load_category_json(path)
for category in data.get("categories") or []:
if str(category.get("slug") or "") != "hardcore_sexual_poses":
continue
for subcategory in category.get("subcategories") or []:
templates = subcategory.get("item_templates")
if not isinstance(templates, list) or not templates:
continue
sub_path = f"categories.{category.get('slug')}.subcategories.{subcategory.get('slug')}"
metadata = subcategory.get("item_template_metadata")
if not isinstance(metadata, dict):
errors.append((path.name, sub_path, "missing item_template_metadata default block"))
continue
normalized = template_metadata_policy.template_metadata(metadata)
if not template_metadata_policy.template_action_family(normalized):
errors.append((path.name, f"{sub_path}.item_template_metadata", "missing normalized action_family"))
if not template_metadata_policy.template_position_family(normalized):
errors.append((path.name, f"{sub_path}.item_template_metadata", "missing normalized position_family"))
return errors
def _item_candidates_for_coverage(subcategory: dict[str, Any]) -> list[Any]:
items = subcategory.get("items")
if isinstance(items, list) and items:
return items
return [subcategory.get("name") or subcategory.get("slug") or ""]
def _configured_pool_errors(
*,
category: dict[str, Any],
subcategory: dict[str, Any],
item: Any,
path: str,
pool_label: str,
direct_key: str,
pool_key: str,
pool_library: dict[str, list[Any]],
inherit_key: str,
) -> list[tuple[str, str, str]]:
try:
entries = category_policy.configured_pool(
category,
subcategory,
item,
direct_key,
pool_key,
pool_library,
inherit_key,
)
except Exception as exc:
return [("(category library)", f"{path}.{pool_label}", f"cannot resolve effective pool: {exc}")]
if not entries:
return [("(category library)", f"{path}.{pool_label}", "missing effective configured entries")]
return []
def _effective_category_coverage_errors(paths: list[Path]) -> list[tuple[str, str, str]]:
# `paths` is accepted to keep this check grouped with the other category
# validations. The effective route check uses the normalized loader because
# generation also consumes normalized category objects.
_ = paths
categories = category_policy.load_category_library()
scene_pools = category_policy.load_scene_pool_library()
expression_pools = category_policy.load_expression_pool_library()
composition_pools = category_policy.load_composition_pool_library()
errors: list[tuple[str, str, str]] = []
if not categories:
return [("(category library)", "categories", "no categories loaded")]
for category in categories:
category_slug = str(category.get("slug") or category.get("name") or "category")
category_path = f"categories.{category_slug}"
subject_type = str(category.get("subject_type") or "").strip()
if not subject_type:
errors.append(("(category library)", f"{category_path}.subject_type", "missing subject_type"))
subcategories = category.get("subcategories")
if not isinstance(subcategories, list) or not subcategories:
errors.append(("(category library)", f"{category_path}.subcategories", "missing subcategories"))
continue
for subcategory in subcategories:
if not isinstance(subcategory, dict):
errors.append(("(category library)", f"{category_path}.subcategories", "subcategory must be an object"))
continue
sub_slug = str(subcategory.get("slug") or subcategory.get("name") or "subcategory")
sub_path = f"{category_path}.subcategories.{sub_slug}"
effective_subject = str(subcategory.get("subject_type") or subject_type).strip()
if not effective_subject:
errors.append(("(category library)", f"{sub_path}.subject_type", "missing effective subject_type"))
has_items = isinstance(subcategory.get("items"), list) and bool(subcategory.get("items"))
has_templates = isinstance(subcategory.get("item_templates"), list) and bool(subcategory.get("item_templates"))
if not has_items and not has_templates:
errors.append(("(category library)", f"{sub_path}.items", "missing items or item_templates"))
for item_index, item in enumerate(_item_candidates_for_coverage(subcategory)):
item_path = f"{sub_path}.items[{item_index}]"
errors.extend(
_configured_pool_errors(
category=category,
subcategory=subcategory,
item=item,
path=item_path,
pool_label="scenes",
direct_key="scenes",
pool_key="scene_pools",
pool_library=scene_pools,
inherit_key="inherit_scenes",
)
)
errors.extend(
_configured_pool_errors(
category=category,
subcategory=subcategory,
item=item,
path=item_path,
pool_label="expressions",
direct_key="expressions",
pool_key="expression_pools",
pool_library=expression_pools,
inherit_key="inherit_expressions",
)
)
errors.extend(
_configured_pool_errors(
category=category,
subcategory=subcategory,
item=item,
path=item_path,
pool_label="compositions",
direct_key="compositions",
pool_key="composition_pools",
pool_library=composition_pools,
inherit_key="inherit_compositions",
)
)
if category_slug == "hardcore_sexual_poses" and has_templates:
metadata = subcategory.get("item_template_metadata")
if not isinstance(metadata, dict):
errors.append(("(category library)", f"{sub_path}.item_template_metadata", "missing route metadata"))
continue
normalized = template_metadata_policy.template_metadata(metadata)
if not template_metadata_policy.template_action_family(normalized):
errors.append(("(category library)", f"{sub_path}.item_template_metadata", "missing normalized action_family"))
if not template_metadata_policy.template_position_family(normalized):
errors.append(("(category library)", f"{sub_path}.item_template_metadata", "missing normalized position_family"))
return errors
def _smoke_case_names(path: Path) -> set[str]:
if not path.exists():
return set()
tree = ast.parse(path.read_text(encoding="utf-8"))
for node in tree.body:
value: ast.AST | None = None
if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name) and node.target.id == "SMOKE_CASES":
value = node.value
elif isinstance(node, ast.Assign) and any(
isinstance(target, ast.Name) and target.id == "SMOKE_CASES" for target in node.targets
):
value = node.value
if value is None:
continue
if not isinstance(value, (ast.List, ast.Tuple)):
return set()
names: set[str] = set()
for item in value.elts:
if not isinstance(item, (ast.List, ast.Tuple)) or not item.elts:
continue
first = item.elts[0]
if isinstance(first, ast.Constant) and isinstance(first.value, str):
names.add(first.value)
return names
return set()
def _routing_doc_errors() -> list[tuple[str, str, str]]:
docs = {
"docs/prompt-pool-routing-map.md": ROOT / "docs" / "prompt-pool-routing-map.md",
"docs/prompt-architecture-improvement-plan.md": ROOT / "docs" / "prompt-architecture-improvement-plan.md",
}
smoke_path = ROOT / "tools" / "prompt_smoke.py"
smoke_cases = _smoke_case_names(smoke_path)
errors: list[tuple[str, str, str]] = []
if not smoke_cases:
errors.append(("tools/prompt_smoke.py", "SMOKE_CASES", "no registered smoke cases found"))
for module_name, smoke_case in CRITICAL_ROUTE_MODULES:
if not (ROOT / module_name).exists():
errors.append((module_name, "module", "critical route module is missing"))
for doc_name, doc_path in docs.items():
doc_text = doc_path.read_text(encoding="utf-8") if doc_path.exists() else ""
if module_name not in doc_text:
errors.append((module_name, doc_name, "critical route module is not documented"))
if smoke_case and smoke_case not in smoke_cases:
errors.append((module_name, "tools/prompt_smoke.py", f"missing registered smoke case: {smoke_case}"))
route_map_text = docs["docs/prompt-pool-routing-map.md"].read_text(encoding="utf-8")
for snippet in ENTRY_ROUTE_SNIPPETS:
if snippet not in route_map_text:
errors.append(("(entry route)", "docs/prompt-pool-routing-map.md", f"missing entry snippet: {snippet}"))
return errors
class _PromptRowReadVisitor(ast.NodeVisitor):
def __init__(self, path: Path) -> None:
self.path = path
self.source = path.read_text(encoding="utf-8")
self.function_stack: list[str] = []
self.errors: list[tuple[str, str, str]] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self.function_stack.append(node.name)
self.generic_visit(node)
self.function_stack.pop()
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self.function_stack.append(node.name)
self.generic_visit(node)
self.function_stack.pop()
def visit_Call(self, node: ast.Call) -> None:
segment = ast.get_source_segment(self.source, node) or ""
if 'row.get("prompt"' in segment or "row.get('prompt'" in segment:
function_name = self.function_stack[-1] if self.function_stack else "<module>"
key = (self.path.name, function_name)
if key not in ALLOWED_PROMPT_ROW_READS:
self.errors.append(
(
self.path.name,
f"{function_name}:{node.lineno}",
"metadata formatter code reads row prompt text; use structured metadata or add an explicit audited exception",
)
)
self.generic_visit(node)
def _prompt_row_read_errors() -> list[tuple[str, str, str]]:
paths: dict[Path, None] = {}
for pattern in PROMPT_ROW_READ_SCAN_GLOBS:
for path in ROOT.glob(pattern):
paths[path] = None
errors: list[tuple[str, str, str]] = []
for path in sorted(paths):
visitor = _PromptRowReadVisitor(path)
visitor.visit(ast.parse(visitor.source))
errors.extend(visitor.errors)
return errors
def _json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=True, sort_keys=True)
def _expect_runtime(condition: bool, location: str, issue: str, errors: list[tuple[str, str, str]]) -> None:
if not condition:
errors.append(("runtime", location, issue))
def _trace_seed_axis(trace: dict[str, Any], axis: str) -> dict[str, Any]:
seed_axes = trace.get("seed_axes")
if not isinstance(seed_axes, dict):
return {}
axis_trace = seed_axes.get(axis)
return axis_trace if isinstance(axis_trace, dict) else {}
def _row_trace_errors(row: dict[str, Any], location: str, errors: list[tuple[str, str, str]]) -> None:
trace = row.get("generation_trace")
_expect_runtime(isinstance(trace, dict), location, "missing generation_trace", errors)
if not isinstance(trace, dict):
return
for key in ("builder", "branch", "source", "category", "subcategory", "seed", "seed_axes"):
_expect_runtime(key in trace, f"{location}.generation_trace", f"missing {key}", errors)
_expect_runtime(trace.get("builder") == "prompt_builder", f"{location}.generation_trace.builder", "unexpected builder", errors)
_expect_runtime(isinstance(trace.get("seed_axes"), dict), f"{location}.generation_trace.seed_axes", "seed_axes must be an object", errors)
for axis in ("content", "person", "scene", "pose", "role", "expression", "composition"):
axis_trace = _trace_seed_axis(trace, axis)
_expect_runtime(bool(axis_trace), f"{location}.generation_trace.seed_axes.{axis}", "missing axis trace", errors)
if axis_trace:
_expect_runtime(axis_trace.get("source") in {"main", "configured"}, f"{location}.generation_trace.seed_axes.{axis}.source", "invalid seed source", errors)
_expect_runtime(isinstance(axis_trace.get("seed"), int), f"{location}.generation_trace.seed_axes.{axis}.seed", "seed must be an integer", errors)
_expect_runtime(isinstance(axis_trace.get("rng_seed"), int), f"{location}.generation_trace.seed_axes.{axis}.rng_seed", "rng_seed must be an integer", errors)
def _formatter_trace_errors(
metadata_json: str,
location: str,
errors: list[tuple[str, str, str]],
*,
target: str = "auto",
) -> None:
valid_metadata_branches = {"metadata", "metadata(single)", "insta_of_pair"}
krea = krea_formatter.format_krea2_prompt("", metadata_json=metadata_json, input_hint="metadata_json", target=target)
_expect_runtime(
"metadata" in str(krea.get("method") or ""),
f"{location}.krea.method",
f"formatter did not consume metadata: {krea.get('method')}",
errors,
)
krea_trace_text = str(krea.get("route_trace_json") or "")
_expect_runtime(bool(krea_trace_text), f"{location}.krea.route_trace_json", "missing route trace", errors)
if krea_trace_text:
try:
krea_trace = json.loads(krea_trace_text)
except json.JSONDecodeError as exc:
errors.append(("runtime", f"{location}.krea.route_trace_json", f"invalid JSON: {exc}"))
else:
_expect_runtime(krea_trace.get("formatter") == "krea2", f"{location}.krea.route_trace_json.formatter", "unexpected formatter", errors)
_expect_runtime(krea_trace.get("branch") in valid_metadata_branches, f"{location}.krea.route_trace_json.branch", "unexpected branch", errors)
_expect_runtime(krea_trace.get("input_hint") == "metadata_json", f"{location}.krea.route_trace_json.input_hint", "unexpected input hint", errors)
sdxl = sdxl_formatter.format_sdxl_prompt("", metadata_json=metadata_json, input_hint="metadata_json", target=target)
_expect_runtime(
"metadata" in str(sdxl.get("method") or ""),
f"{location}.sdxl.method",
f"formatter did not consume metadata: {sdxl.get('method')}",
errors,
)
sdxl_trace_text = str(sdxl.get("route_trace_json") or "")
_expect_runtime(bool(sdxl_trace_text), f"{location}.sdxl.route_trace_json", "missing route trace", errors)
if sdxl_trace_text:
try:
sdxl_trace = json.loads(sdxl_trace_text)
except json.JSONDecodeError as exc:
errors.append(("runtime", f"{location}.sdxl.route_trace_json", f"invalid JSON: {exc}"))
else:
_expect_runtime(sdxl_trace.get("formatter") == "sdxl", f"{location}.sdxl.route_trace_json.formatter", "unexpected formatter", errors)
_expect_runtime(sdxl_trace.get("branch") in valid_metadata_branches, f"{location}.sdxl.route_trace_json.branch", "unexpected branch", errors)
_expect_runtime(sdxl_trace.get("input_hint") == "metadata_json", f"{location}.sdxl.route_trace_json.input_hint", "unexpected input hint", errors)
caption, caption_method, caption_trace_text = caption_naturalizer.naturalize_caption_with_trace(
"",
metadata_json=metadata_json,
input_hint="metadata_json",
target=target,
)
_expect_runtime(bool(caption.strip()), f"{location}.caption", "caption output is empty", errors)
_expect_runtime(
"metadata" in str(caption_method or ""),
f"{location}.caption.method",
f"formatter did not consume metadata: {caption_method}",
errors,
)
_expect_runtime(bool(caption_trace_text), f"{location}.caption.route_trace_json", "missing route trace", errors)
if caption_trace_text:
try:
caption_trace = json.loads(caption_trace_text)
except json.JSONDecodeError as exc:
errors.append(("runtime", f"{location}.caption.route_trace_json", f"invalid JSON: {exc}"))
else:
_expect_runtime(caption_trace.get("formatter") == "caption", f"{location}.caption.route_trace_json.formatter", "unexpected formatter", errors)
_expect_runtime(caption_trace.get("branch") in valid_metadata_branches, f"{location}.caption.route_trace_json.branch", "unexpected branch", errors)
_expect_runtime(caption_trace.get("input_hint") == "metadata_json", f"{location}.caption.route_trace_json.input_hint", "unexpected input hint", errors)
def _runtime_metadata_errors() -> list[tuple[str, str, str]]:
errors: list[tuple[str, str, str]] = []
seed_lock = pb.build_seed_lock_config_json(base_seed=4101, reroll_axis="scene", reroll_seed=4102)
row = pb.build_prompt(
category="Casual clothes",
subcategory="Casual clothes / Smart casual",
row_number=2,
start_index=5,
seed=4101,
clothing="random",
ethnicity="french_european",
poses="random",
backside_bias=0.25,
figure="random",
no_plus_women=False,
no_black=False,
minimal_clothing_ratio=0.35,
standard_pose_ratio=0.4,
trigger="sxcppnl7",
prepend_trigger_to_prompt=True,
extra_positive="",
extra_negative="",
seed_config=seed_lock,
women_count=1,
men_count=0,
)
_row_trace_errors(row, "build_prompt.row", errors)
trace = row.get("generation_trace") if isinstance(row.get("generation_trace"), dict) else {}
_expect_runtime(trace.get("branch") == "custom", "build_prompt.row.generation_trace.branch", "expected custom branch", errors)
_expect_runtime(trace.get("source") == "json_category", "build_prompt.row.generation_trace.source", "expected JSON category source", errors)
scene_axis = _trace_seed_axis(trace, "scene")
_expect_runtime(scene_axis.get("source") == "configured", "build_prompt.row.generation_trace.seed_axes.scene.source", "expected configured scene seed", errors)
_expect_runtime(scene_axis.get("seed") == 4102, "build_prompt.row.generation_trace.seed_axes.scene.seed", "expected scene reroll seed", errors)
_formatter_trace_errors(_json_dumps(row), "build_prompt.row", errors)
pair_seed_lock = pb.build_seed_lock_config_json(base_seed=4201, reroll_axis="pose", reroll_seed=4202)
pair_options = pb.build_insta_of_options_json(
softcore_cast="same_as_hardcore",
hardcore_cast="couple",
hardcore_women_count=1,
hardcore_men_count=1,
hardcore_clothing_continuity="explicit_nude",
hardcore_camera_mode="standard",
camera_detail="off",
)
position_config = pb.build_hardcore_position_pool_json(family="penetration")
pair = pb.build_insta_of_pair(
row_number=1,
start_index=1,
seed=4201,
ethnicity="french_european",
figure="random",
no_plus_women=False,
no_black=False,
trigger="sxcppnl7",
prepend_trigger_to_prompt=True,
seed_config=pair_seed_lock,
options_json=pair_options,
hardcore_position_config=position_config,
)
_expect_runtime(pair.get("mode") == "Insta/OF", "build_insta_of_pair.mode", "expected Insta/OF pair metadata", errors)
soft_row = pair.get("softcore_row") if isinstance(pair.get("softcore_row"), dict) else {}
hard_row = pair.get("hardcore_row") if isinstance(pair.get("hardcore_row"), dict) else {}
_row_trace_errors(soft_row, "build_insta_of_pair.softcore_row", errors)
_row_trace_errors(hard_row, "build_insta_of_pair.hardcore_row", errors)
hard_trace = hard_row.get("generation_trace") if isinstance(hard_row.get("generation_trace"), dict) else {}
_expect_runtime(hard_trace.get("category_slug") == "hardcore_sexual_poses", "build_insta_of_pair.hardcore_row.generation_trace.category_slug", "expected hardcore pose category", errors)
_expect_runtime(hard_trace.get("content_seed_axis") == "pose", "build_insta_of_pair.hardcore_row.generation_trace.content_seed_axis", "expected pose-driven hardcore content axis", errors)
pose_axis = _trace_seed_axis(hard_trace, "pose")
_expect_runtime(pose_axis.get("source") == "configured", "build_insta_of_pair.hardcore_row.generation_trace.seed_axes.pose.source", "expected configured pose seed", errors)
_expect_runtime(pose_axis.get("seed") == 4202, "build_insta_of_pair.hardcore_row.generation_trace.seed_axes.pose.seed", "expected pose reroll seed", errors)
_formatter_trace_errors(_json_dumps(pair), "build_insta_of_pair", errors, target="hardcore")
return errors
def print_table(headers: tuple[str, ...], rows: list[tuple[Any, ...]]) -> None:
widths = [len(header) for header in headers]
for row in rows:
for index, value in enumerate(row):
widths[index] = max(widths[index], len(str(value)))
print("| " + " | ".join(header.ljust(widths[index]) for index, header in enumerate(headers)) + " |")
print("| " + " | ".join("-" * width for width in widths) + " |")
for row in rows:
print("| " + " | ".join(str(value).ljust(widths[index]) for index, value in enumerate(row)) + " |")
def main() -> int:
category_paths = _category_json_paths()
display: dict[str, Any] = {}
returns: dict[str, tuple[str, ...]] = {}
for path in _node_python_paths():
display.update(_assignment_dict(path, "NODE_DISPLAY_NAME_MAPPINGS"))
display.update(_assignment_dict(path, "LOOP_NODE_DISPLAY_NAME_MAPPINGS"))
returns.update(_class_return_names(path))
print("# Node Display Map")
node_rows = []
for class_name, display_name in sorted(display.items(), key=lambda item: str(item[1])):
return_names = ", ".join(returns.get(class_name, ()))
node_rows.append((display_name, class_name, return_names or "(dynamic or unnamed)"))
print_table(("Display name", "Class", "Return names"), node_rows)
print("\n# Category JSON Summary")
category_rows = []
for path in category_paths:
summary = _category_summary(path)
category_rows.append(
(
path.name,
summary["categories"],
summary["subcategories"],
summary["item_templates"],
summary["scene_pools"],
summary["expression_pools"],
summary["composition_pools"],
summary["pool_extensions"],
)
)
print_table(
(
"File",
"Categories",
"Subcategories",
"Item templates",
"Scene pools",
"Expression pools",
"Composition pools",
"Extensions",
),
category_rows,
)
print("\n# Named Pool Inventory")
pool_rows = []
for path in category_paths:
for key in ("scene_pools", "expression_pools", "composition_pools"):
names = _pool_names(path, key)
if names:
pool_rows.append((path.name, key, len(names), ", ".join(names[:8]) + (" ..." if len(names) > 8 else "")))
print_table(("File", "Pool type", "Count", "First names"), pool_rows)
print("\n# JSON Reference Validation")
reference_errors = _json_reference_errors(category_paths)
if reference_errors:
print_table(("File", "Path", "Issue"), reference_errors)
return 1
print("OK: all JSON pool references and item template axes resolve.")
print("\n# Hardcore Template Metadata Validation")
hardcore_metadata_errors = _hardcore_template_metadata_errors(category_paths)
if hardcore_metadata_errors:
print_table(("File", "Path", "Issue"), hardcore_metadata_errors)
return 1
print("OK: hardcore template subcategories define explicit route metadata defaults.")
print("\n# Effective Category Route Coverage Validation")
category_coverage_errors = _effective_category_coverage_errors(category_paths)
if category_coverage_errors:
print_table(("Source", "Path", "Issue"), category_coverage_errors)
return 1
print("OK: category routes define effective item, scene, expression, composition, and route metadata coverage.")
print("\n# Routing Documentation Validation")
routing_doc_errors = _routing_doc_errors()
if routing_doc_errors:
print_table(("Module", "Location", "Issue"), routing_doc_errors)
return 1
print("OK: critical route modules are documented and covered by smoke cases.")
print("\n# Metadata Prompt Fallback Validation")
prompt_row_read_errors = _prompt_row_read_errors()
if prompt_row_read_errors:
print_table(("Module", "Location", "Issue"), prompt_row_read_errors)
return 1
print("OK: metadata formatter modules avoid raw prompt reads outside audited fallback helpers.")
print("\n# Runtime Metadata Route Validation")
runtime_metadata_errors = _runtime_metadata_errors()
if runtime_metadata_errors:
print_table(("Source", "Location", "Issue"), runtime_metadata_errors)
return 1
print("OK: builder rows, pair rows, and formatter traces preserve metadata routes.")
return 0
if __name__ == "__main__":
raise SystemExit(main())