Files
Ethanfel acaa9f0168
Publish to Comfy registry / Publish Custom Node to registry (push) Has been cancelled
feat(search): draw real node box from disabled-pack source
Click a node name (or 'Draw this node') in the mirror-search palette to render
an imitation ComfyUI node box with its real input sockets, widget defaults, and
output sockets. Since disabled packs aren't loaded (no /object_info entry), a
new read-only backend module (node_introspect.py) AST-parses the pack on disk —
never importing or executing it — to recover INPUT_TYPES / RETURN_TYPES from a
literal NODE_CLASS_MAPPINGS. New GET /nodes-stats/node-schema endpoint resolves
the disabled pack dir (handling @version suffixes / case) and returns the schema
off the event loop. Frontend lazily fetches + caches per node, renders sockets
vs widgets with type-colored dots, and falls back to a placeholder for packs
that build their node list dynamically.

End-to-end against the live install: 67/68 disabled packs resolve on disk, ~92%
of nodes render a real box, the rest fall back cleanly. Adds 7 parser unit
tests (36 total green). Bump to 1.6.0.
2026-06-21 15:23:58 +02:00

288 lines
10 KiB
Python

"""Static (no-execution) introspection of disabled custom-node packages.
The mirror-search palette previews nodes that belong to *disabled* packs. Those
packs aren't imported by ComfyUI, so their INPUT_TYPES / RETURN_TYPES are not in
/object_info. To draw a faithful node box we parse the pack's Python source with
``ast`` — we never import or execute it (importing a disabled pack could have
side effects, pull heavy deps, or fail). This yields real inputs/outputs for the
~75% of packs that declare a literal ``NODE_CLASS_MAPPINGS``; packs that build
their mappings dynamically simply report ``parseable: False`` and the frontend
falls back to a placeholder box.
"""
import ast
import logging
import os
import warnings
logger = logging.getLogger(__name__)
# Input "types" that ComfyUI renders as in-node widgets rather than sockets.
_WIDGET_TYPES = {"INT", "FLOAT", "STRING", "BOOLEAN", "BOOL"}
# Cache parsed pack indexes for the session, keyed by source path. Disabled
# packs don't change while ComfyUI runs, so we never invalidate.
_INDEX_CACHE = {}
def _const_str(node):
if isinstance(node, ast.Constant) and isinstance(node.value, str):
return node.value
return None
def _parse_file(path):
try:
with open(path, "r", encoding="utf-8", errors="ignore") as fh:
src = fh.read()
# Third-party sources often contain unescaped regex strings; ast.parse
# emits SyntaxWarning for those. Suppress to keep server logs clean.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return ast.parse(src)
except Exception:
return None
def _iter_py_files(root, limit=500):
"""Yield up to ``limit`` .py files under a pack dir (or the file itself)."""
if os.path.isfile(root):
if root.endswith(".py"):
yield root
return
count = 0
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [
d for d in dirnames
if not d.startswith(".") and d not in ("__pycache__", "node_modules", "js", "web", "dist")
]
for fn in filenames:
if fn.endswith(".py"):
yield os.path.join(dirpath, fn)
count += 1
if count >= limit:
return
def _string_tuple(node):
"""A RETURN_TYPES / RETURN_NAMES value -> list of strings ("*" for non-literal)."""
if isinstance(node, (ast.Tuple, ast.List)):
out = []
for e in node.elts:
s = _const_str(e)
out.append(s if s is not None else "*")
return out
s = _const_str(node)
return [s] if s is not None else None
def _extract_input_def(name, val):
"""One INPUT_TYPES entry -> {name, type, widget, default, options}."""
d = {"name": name, "type": "*", "widget": False, "default": None, "options": None}
type_node, opts_node = None, None
if isinstance(val, (ast.Tuple, ast.List)) and val.elts:
type_node = val.elts[0]
if len(val.elts) > 1:
opts_node = val.elts[1]
else:
type_node = val
s = _const_str(type_node)
if s is not None:
d["type"] = s
if s.upper() in _WIDGET_TYPES:
d["widget"] = True
elif isinstance(type_node, (ast.List, ast.Tuple)):
# Inline combo box: ["a", "b", ...]
opts = [_const_str(e) for e in type_node.elts]
opts = [o for o in opts if o is not None]
d["type"] = "COMBO"
d["widget"] = True
d["options"] = opts or None
elif isinstance(type_node, ast.Call):
# Dynamic list, e.g. folder_paths.get_filename_list(...) -> dropdown widget.
d["type"] = "COMBO"
d["widget"] = True
# else: a Name/Attribute (custom or wildcard socket type) -> keep "*", socket.
if isinstance(opts_node, ast.Dict):
for ok, ov in zip(opts_node.keys, opts_node.values):
if _const_str(ok) == "default":
try:
d["default"] = ast.literal_eval(ov)
except Exception:
d["default"] = _const_str(ov)
return d
def _extract_input_types(fn):
"""The INPUT_TYPES classmethod -> {"required": [...], "optional": [...]} or None."""
ret = None
for n in ast.walk(fn):
if isinstance(n, ast.Return) and isinstance(n.value, ast.Dict):
ret = n.value
break
if ret is None:
return None
result = {"required": [], "optional": []}
for cat_key, cat_val in zip(ret.keys, ret.values):
cat = _const_str(cat_key)
if cat not in ("required", "optional") or not isinstance(cat_val, ast.Dict):
continue
for nk, nv in zip(cat_val.keys, cat_val.values):
name = _const_str(nk)
if name is not None:
result[cat].append(_extract_input_def(name, nv))
return result
def _extract_class(cls):
info = {"input_types": None, "return_types": None, "return_names": None,
"category": None, "output_node": False}
for b in cls.body:
if isinstance(b, (ast.FunctionDef, ast.AsyncFunctionDef)) and b.name == "INPUT_TYPES":
info["input_types"] = _extract_input_types(b)
elif isinstance(b, ast.Assign):
for t in b.targets:
tn = getattr(t, "id", None)
if tn == "RETURN_TYPES":
info["return_types"] = _string_tuple(b.value)
elif tn == "RETURN_NAMES":
info["return_names"] = _string_tuple(b.value)
elif tn == "CATEGORY":
info["category"] = _const_str(b.value)
elif tn == "OUTPUT_NODE" and isinstance(b.value, ast.Constant):
info["output_node"] = bool(b.value.value)
return info
def _merge_mapping(out, dictnode):
if not isinstance(dictnode, ast.Dict):
return
for k, v in zip(dictnode.keys, dictnode.values):
key = _const_str(k)
if key is None:
continue
if isinstance(v, ast.Name):
out[key] = v.id
elif isinstance(v, ast.Call) and isinstance(v.func, ast.Name):
out[key] = v.func.id
def _merge_display(out, dictnode):
if not isinstance(dictnode, ast.Dict):
return
for k, v in zip(dictnode.keys, dictnode.values):
key, val = _const_str(k), _const_str(v)
if key is not None and val is not None:
out[key] = val
def build_pack_index(pack_path):
"""Parse a pack -> (classes, mappings, display).
classes: { ClassName: {input_types, return_types, return_names, category, output_node} }
mappings: { node_key: ClassName } (from literal NODE_CLASS_MAPPINGS / .update)
display: { node_key: "Pretty Name" }
"""
cached = _INDEX_CACHE.get(pack_path)
if cached is not None:
return cached
classes, mappings, display = {}, {}, {}
for f in _iter_py_files(pack_path):
tree = _parse_file(f)
if tree is None:
continue
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.setdefault(node.name, _extract_class(node))
elif isinstance(node, ast.Assign):
for t in node.targets:
name = getattr(t, "id", None)
if name == "NODE_CLASS_MAPPINGS":
_merge_mapping(mappings, node.value)
elif name == "NODE_DISPLAY_NAME_MAPPINGS":
_merge_display(display, node.value)
elif isinstance(node, ast.Call):
fn = node.func
if (isinstance(fn, ast.Attribute) and fn.attr == "update"
and isinstance(fn.value, ast.Name) and node.args
and isinstance(node.args[0], ast.Dict)):
if fn.value.id == "NODE_CLASS_MAPPINGS":
_merge_mapping(mappings, node.args[0])
elif fn.value.id == "NODE_DISPLAY_NAME_MAPPINGS":
_merge_display(display, node.args[0])
result = (classes, mappings, display)
_INDEX_CACHE[pack_path] = result
return result
def get_node_schema(class_type, pack_path):
"""Return a render-ready schema for one node, or {parseable: False, reason}."""
classes, mappings, display = build_pack_index(pack_path)
cls_name = mappings.get(class_type)
if cls_name is None and class_type in classes:
cls_name = class_type # fall back: class_type IS the class name
if cls_name is None or cls_name not in classes:
return {"parseable": False, "reason": "dynamic_mapping" if mappings else "no_mapping"}
info = classes[cls_name]
inputs = []
it = info["input_types"]
if it:
for cat in ("required", "optional"):
for d in it[cat]:
inputs.append({**d, "required": cat == "required"})
rt = info["return_types"] or []
rn = info["return_names"] or []
outputs = []
for i, t in enumerate(rt):
nm = rn[i] if i < len(rn) and rn[i] else t
outputs.append({"name": nm or t, "type": t})
return {
"parseable": True,
"class_type": class_type,
"display_name": display.get(class_type) or class_type,
"category": info["category"],
"output_node": info["output_node"],
"inputs": inputs,
"outputs": outputs,
}
def find_disabled_pack_path(pack_name):
"""Locate a disabled pack's source under any custom_nodes/.disabled/ dir.
Matches case-insensitively and ignores any ``@version`` suffix that ComfyUI
Manager appends on disk (e.g. ``ComfyMath@nightly`` for pack ``comfymath``).
Returns an absolute path (dir or .py file) or None. Rejects path-y input.
"""
if not pack_name or any(c in pack_name for c in ("/", "\\")) or ".." in pack_name:
return None
try:
import folder_paths
roots = folder_paths.get_folder_paths("custom_nodes")
except Exception:
roots = []
target = pack_name.lower()
for root in roots:
ddir = os.path.join(root, ".disabled")
if not os.path.isdir(ddir):
continue
try:
entries = os.listdir(ddir)
except Exception:
continue
for e in entries:
base = e.split("@", 1)[0]
stem = base[:-3] if base.endswith(".py") else base
if target in (stem.lower(), base.lower()):
return os.path.join(ddir, e)
return None