Audit node registration coverage
This commit is contained in:
@@ -126,6 +126,10 @@ AUDIT_DOC_SNIPPETS: tuple[tuple[str, str], ...] = (
|
||||
"docs/prompt-pool-routing-map.md",
|
||||
"node documentation validation",
|
||||
),
|
||||
(
|
||||
"docs/prompt-pool-routing-map.md",
|
||||
"node registration validation",
|
||||
),
|
||||
)
|
||||
|
||||
PROMPT_ROW_READ_SCAN_GLOBS: tuple[str, ...] = (
|
||||
@@ -220,6 +224,80 @@ def _class_return_names(path: Path) -> dict[str, tuple[str, ...]]:
|
||||
return result
|
||||
|
||||
|
||||
def _sxcp_class_names(path: Path) -> set[str]:
|
||||
tree = ast.parse(path.read_text(encoding="utf-8"))
|
||||
return {
|
||||
node.name
|
||||
for node in tree.body
|
||||
if isinstance(node, ast.ClassDef) and node.name.startswith("SxCP")
|
||||
}
|
||||
|
||||
|
||||
def _class_mapping_entries(path: Path, mapping_names: tuple[str, ...]) -> dict[str, str]:
|
||||
tree = ast.parse(path.read_text(encoding="utf-8"))
|
||||
entries: dict[str, str] = {}
|
||||
for node in tree.body:
|
||||
if not isinstance(node, ast.Assign):
|
||||
continue
|
||||
if not any(isinstance(target, ast.Name) and target.id in mapping_names for target in node.targets):
|
||||
continue
|
||||
if not isinstance(node.value, ast.Dict):
|
||||
continue
|
||||
for key_node, value_node in zip(node.value.keys, node.value.values):
|
||||
key = _literal_or_none(key_node) if key_node is not None else None
|
||||
if not isinstance(key, str):
|
||||
continue
|
||||
if isinstance(value_node, ast.Name):
|
||||
entries[key] = value_node.id
|
||||
else:
|
||||
entries[key] = ""
|
||||
return entries
|
||||
|
||||
|
||||
def _display_mapping_keys(path: Path, mapping_names: tuple[str, ...]) -> set[str]:
|
||||
tree = ast.parse(path.read_text(encoding="utf-8"))
|
||||
keys: set[str] = set()
|
||||
for node in tree.body:
|
||||
if not isinstance(node, ast.Assign):
|
||||
continue
|
||||
if not any(isinstance(target, ast.Name) and target.id in mapping_names for target in node.targets):
|
||||
continue
|
||||
if not isinstance(node.value, ast.Dict):
|
||||
continue
|
||||
for key_node in node.value.keys:
|
||||
key = _literal_or_none(key_node) if key_node is not None else None
|
||||
if isinstance(key, str):
|
||||
keys.add(key)
|
||||
return keys
|
||||
|
||||
|
||||
def _node_registration_errors() -> list[tuple[str, str, str]]:
|
||||
errors: list[tuple[str, str, str]] = []
|
||||
for path in _node_module_python_paths():
|
||||
class_names = _sxcp_class_names(path)
|
||||
if not class_names:
|
||||
continue
|
||||
class_map = _class_mapping_entries(path, ("NODE_CLASS_MAPPINGS", "LOOP_NODE_CLASS_MAPPINGS"))
|
||||
display_keys = _display_mapping_keys(path, ("NODE_DISPLAY_NAME_MAPPINGS", "LOOP_NODE_DISPLAY_NAME_MAPPINGS"))
|
||||
registered_classes = {class_name for class_name in class_map.values() if class_name}
|
||||
|
||||
for class_name in sorted(class_names - registered_classes):
|
||||
errors.append((path.name, class_name, "SxCP node class is not registered in the module class mapping"))
|
||||
for mapping_key, class_name in sorted(class_map.items()):
|
||||
if not class_name:
|
||||
errors.append((path.name, mapping_key, "node class mapping value should be a class name"))
|
||||
continue
|
||||
if mapping_key != class_name:
|
||||
errors.append((path.name, mapping_key, f"node class mapping key should match class name {class_name}"))
|
||||
if class_name not in class_names:
|
||||
errors.append((path.name, mapping_key, f"node class mapping references unknown class {class_name}"))
|
||||
for mapping_key in sorted(set(class_map) - display_keys):
|
||||
errors.append((path.name, mapping_key, "registered node is missing a display-name mapping"))
|
||||
for mapping_key in sorted(display_keys - set(class_map)):
|
||||
errors.append((path.name, mapping_key, "display-name mapping has no matching registered node"))
|
||||
return errors
|
||||
|
||||
|
||||
def _category_summary(path: Path) -> dict[str, Any]:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
categories = data.get("categories") or []
|
||||
@@ -260,6 +338,12 @@ def _node_python_paths() -> list[Path]:
|
||||
return [path for path in paths if path.exists()]
|
||||
|
||||
|
||||
def _node_module_python_paths() -> list[Path]:
|
||||
paths = [ROOT / "loop_nodes.py"]
|
||||
paths.extend(sorted(ROOT.glob("node_*.py")))
|
||||
return [path for path in paths if path.exists()]
|
||||
|
||||
|
||||
def _load_category_json(path: Path) -> dict[str, Any]:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
return data if isinstance(data, dict) else {}
|
||||
@@ -1237,6 +1321,13 @@ def main() -> int:
|
||||
return 1
|
||||
print("OK: registered node display names are documented in the route map or README.")
|
||||
|
||||
print("\n# Node Registration Validation")
|
||||
node_registration_errors = _node_registration_errors()
|
||||
if node_registration_errors:
|
||||
print_table(("Module", "Node", "Issue"), node_registration_errors)
|
||||
return 1
|
||||
print("OK: concrete SxCP node classes are registered with matching display names.")
|
||||
|
||||
print("\n# Metadata Prompt Fallback Validation")
|
||||
prompt_row_read_errors = _prompt_row_read_errors()
|
||||
if prompt_row_read_errors:
|
||||
|
||||
Reference in New Issue
Block a user