From 669f209930447f38841c3a997b9c64aff59ed093 Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Thu, 2 Jul 2026 12:06:19 +0200 Subject: [PATCH] Add popular node signature extractor --- .../test_generate_popular_node_signatures.py | 144 +++++++++++ tools/__init__.py | 1 + tools/generate_popular_node_signatures.py | 225 ++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 tests/test_generate_popular_node_signatures.py create mode 100644 tools/__init__.py create mode 100644 tools/generate_popular_node_signatures.py diff --git a/tests/test_generate_popular_node_signatures.py b/tests/test_generate_popular_node_signatures.py new file mode 100644 index 0000000..9387c08 --- /dev/null +++ b/tests/test_generate_popular_node_signatures.py @@ -0,0 +1,144 @@ +import json +import tempfile +import textwrap +import unittest +from pathlib import Path + +from tools.generate_popular_node_signatures import ( + extract_repo_signatures, + normalise_input_spec, + write_artifact, +) + + +class StaticExtractionTests(unittest.TestCase): + def test_normalise_input_spec_reduces_combo_lists(self): + self.assertEqual("COMBO", normalise_input_spec((["nearest", "bilinear"],))) + self.assertEqual("IMAGE", normalise_input_spec(("IMAGE",))) + self.assertEqual("FLOAT", normalise_input_spec(("FLOAT", {"default": 1.0}))) + + def test_extracts_static_node_mapping_and_signatures(self): + source = ''' +class FancySize: + RETURN_TYPES = ("INT", "INT") + RETURN_NAMES = ("width", "height") + + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "image": ("IMAGE",), + }, + "optional": { + "scale": ("FLOAT", {"default": 1.0}), + "mode": (["nearest", "bilinear"],), + }, + } + + +NODE_CLASS_MAPPINGS = { + "FancySize": FancySize, +} + +NODE_DISPLAY_NAME_MAPPINGS = { + "FancySize": "Fancy Size", +} +''' + with tempfile.TemporaryDirectory() as tmp: + Path(tmp, "__init__.py").write_text(textwrap.dedent(source), encoding="utf-8") + result = extract_repo_signatures( + Path(tmp), + { + "id": "sample-pack", + "title": "Sample Pack", + "repository": "https://github.com/example/sample-pack", + "rank": 1, + }, + ) + + self.assertIn("FancySize", result["nodes"]) + node = result["nodes"]["FancySize"] + self.assertEqual("Fancy Size", node["display"]) + self.assertEqual("sample-pack", node["pack"]) + self.assertEqual({"image": "IMAGE", "scale": "FLOAT", "mode": "COMBO"}, node["inputs"]) + self.assertEqual(["image"], node["required"]) + self.assertEqual(["INT", "INT"], node["outputs"]) + self.assertEqual(["width", "height"], node["output_names"]) + self.assertEqual("static_exact", node["confidence"]) + + def test_skips_dynamic_input_types_without_failing_repo(self): + source = ''' +def build_inputs(): + return {"required": {"image": ("IMAGE",)}} + + +class DynamicNode: + RETURN_TYPES = ("IMAGE",) + + @classmethod + def INPUT_TYPES(cls): + return build_inputs() + + +NODE_CLASS_MAPPINGS = { + "DynamicNode": DynamicNode, +} +''' + with tempfile.TemporaryDirectory() as tmp: + Path(tmp, "__init__.py").write_text(textwrap.dedent(source), encoding="utf-8") + result = extract_repo_signatures( + Path(tmp), + { + "id": "dynamic-pack", + "title": "Dynamic Pack", + "repository": "https://github.com/example/dynamic-pack", + "rank": 1, + }, + ) + + self.assertEqual({}, result["nodes"]) + self.assertEqual("no_static_nodes", result["pack"]["status"]) + + def test_write_artifact_is_deterministic(self): + with tempfile.TemporaryDirectory() as tmp: + out = Path(tmp, "popular_node_signatures.json") + write_artifact( + out, + sources={"manager_url": "https://example.invalid/manager.json", "limit": 1}, + packs={ + "b-pack": {"id": "b-pack", "title": "B Pack", "status": "ok"}, + "a-pack": {"id": "a-pack", "title": "A Pack", "status": "ok"}, + }, + nodes={ + "BNode": { + "type": "BNode", + "display": "B Node", + "pack": "b-pack", + "repository": "https://github.com/example/b-pack", + "inputs": {}, + "required": [], + "outputs": ["IMAGE"], + "output_names": ["image"], + "confidence": "static_exact", + }, + "ANode": { + "type": "ANode", + "display": "A Node", + "pack": "a-pack", + "repository": "https://github.com/example/a-pack", + "inputs": {}, + "required": [], + "outputs": ["IMAGE"], + "output_names": ["image"], + "confidence": "static_exact", + }, + }, + ) + parsed = json.loads(out.read_text(encoding="utf-8")) + + self.assertEqual(["a-pack", "b-pack"], list(parsed["packs"])) + self.assertEqual(["ANode", "BNode"], list(parsed["nodes"])) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..7eb0de1 --- /dev/null +++ b/tools/__init__.py @@ -0,0 +1 @@ +"""Utility scripts for UTFCN development.""" diff --git a/tools/generate_popular_node_signatures.py b/tools/generate_popular_node_signatures.py new file mode 100644 index 0000000..136a9c2 --- /dev/null +++ b/tools/generate_popular_node_signatures.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +"""Generate UTFCN's popular_node_signatures.json artifact.""" + +import ast +import json +import os +from datetime import datetime, timezone +from pathlib import Path + +SCHEMA_VERSION = 1 +MANAGER_LIST_URL = "https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main/custom-node-list.json" +REGISTRY_NODES_URL = "https://api.comfy.org/nodes" + + +class UnsupportedStaticExpression(Exception): + pass + + +def _literal(node, env): + if isinstance(node, ast.Constant): + return node.value + if isinstance(node, ast.List): + return [_literal(item, env) for item in node.elts] + if isinstance(node, ast.Tuple): + return tuple(_literal(item, env) for item in node.elts) + if isinstance(node, ast.Dict): + result = {} + for key, value in zip(node.keys, node.values): + if key is None: + raise UnsupportedStaticExpression("dict unpacking is not supported") + result[_literal(key, env)] = _literal(value, env) + return result + if isinstance(node, ast.Name) and node.id in env: + return env[node.id] + raise UnsupportedStaticExpression(type(node).__name__) + + +def _collect_module_env(tree): + env = {} + for stmt in tree.body: + if not isinstance(stmt, ast.Assign): + continue + if len(stmt.targets) != 1 or not isinstance(stmt.targets[0], ast.Name): + continue + try: + env[stmt.targets[0].id] = _literal(stmt.value, env) + except UnsupportedStaticExpression: + continue + return env + + +def normalise_input_spec(spec): + first = spec[0] if isinstance(spec, (list, tuple)) and spec else spec + if isinstance(first, list): + return "COMBO" + return str(first) + + +def _class_defs(tree): + return {node.name: node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)} + + +def _class_attr(cls, name, env): + for stmt in cls.body: + if not isinstance(stmt, ast.Assign): + continue + for target in stmt.targets: + if isinstance(target, ast.Name) and target.id == name: + try: + return _literal(stmt.value, env) + except UnsupportedStaticExpression: + return None + return None + + +def _input_types(cls, env): + for stmt in cls.body: + if not isinstance(stmt, ast.FunctionDef) or stmt.name != "INPUT_TYPES": + continue + for child in stmt.body: + if isinstance(child, ast.Return): + try: + value = _literal(child.value, env) + except UnsupportedStaticExpression: + return None + return value if isinstance(value, dict) else None + return None + + +def _mapping_value_name(value): + if isinstance(value, str): + return value + if isinstance(value, ast.Name): + return value.id + return None + + +def _node_class_mappings(tree, env): + for stmt in tree.body: + if not isinstance(stmt, ast.Assign): + continue + if not any( + isinstance(target, ast.Name) and target.id == "NODE_CLASS_MAPPINGS" + for target in stmt.targets + ): + continue + if not isinstance(stmt.value, ast.Dict): + continue + mappings = {} + for key, value in zip(stmt.value.keys, stmt.value.values): + try: + node_type = _literal(key, env) + except UnsupportedStaticExpression: + continue + class_name = _mapping_value_name(value) + if node_type and class_name: + mappings[str(node_type)] = class_name + return mappings + return {} + + +def _display_mappings(tree, env): + for stmt in tree.body: + if not isinstance(stmt, ast.Assign): + continue + if not any( + isinstance(target, ast.Name) and target.id == "NODE_DISPLAY_NAME_MAPPINGS" + for target in stmt.targets + ): + continue + try: + value = _literal(stmt.value, env) + except UnsupportedStaticExpression: + return {} + if isinstance(value, dict): + return {str(k): str(v) for k, v in value.items()} + return {} + + +def _signature_from_class(node_type, cls, display, pack_meta, env): + input_types = _input_types(cls, env) + return_types = _class_attr(cls, "RETURN_TYPES", env) + return_names = _class_attr(cls, "RETURN_NAMES", env) + if not isinstance(input_types, dict) or not isinstance(return_types, (list, tuple)): + return None + + inputs = {} + required = [] + for section in ("required", "optional"): + values = input_types.get(section) or {} + if not isinstance(values, dict): + return None + for name, spec in values.items(): + inputs[str(name)] = normalise_input_spec(spec) + if section == "required": + required.append(str(name)) + + output_names = [] + if isinstance(return_names, (list, tuple)): + output_names = [str(name) for name in return_names] + + return { + "type": node_type, + "display": display or node_type, + "pack": pack_meta["id"], + "repository": pack_meta.get("repository", ""), + "inputs": inputs, + "required": required, + "outputs": [str(value) for value in return_types], + "output_names": output_names, + "confidence": "static_exact", + } + + +def _python_files(repo_dir): + skipped = {".git", "__pycache__", ".venv", "venv", "env", "site-packages"} + for root, dirs, files in os.walk(repo_dir): + dirs[:] = [dirname for dirname in dirs if dirname not in skipped] + for filename in files: + if filename.endswith(".py"): + yield Path(root, filename) + + +def extract_repo_signatures(repo_dir, pack_meta): + nodes = {} + for path in sorted(_python_files(repo_dir)): + try: + tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) + except UnicodeDecodeError: + tree = ast.parse(path.read_text(encoding="utf-8", errors="ignore"), filename=str(path)) + except SyntaxError: + continue + env = _collect_module_env(tree) + mappings = _node_class_mappings(tree, env) + displays = _display_mappings(tree, env) + classes = _class_defs(tree) + for node_type, class_name in sorted(mappings.items()): + cls = classes.get(class_name) + if cls is None: + continue + sig = _signature_from_class(node_type, cls, displays.get(node_type), pack_meta, env) + if sig is not None: + nodes[node_type] = sig + + pack = { + "id": pack_meta["id"], + "title": pack_meta.get("title", pack_meta["id"]), + "repository": pack_meta.get("repository", ""), + "rank": pack_meta.get("rank", 0), + "status": "ok" if nodes else "no_static_nodes", + "node_count": len(nodes), + } + return {"pack": pack, "nodes": nodes} + + +def write_artifact(path, sources, packs, nodes): + payload = { + "schema_version": SCHEMA_VERSION, + "generated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), + "sources": sources, + "packs": {key: packs[key] for key in sorted(packs)}, + "nodes": {key: nodes[key] for key in sorted(nodes)}, + } + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2, sort_keys=False) + "\n", encoding="utf-8")