feat(search): draw real node box from disabled-pack source
Publish to Comfy registry / Publish Custom Node to registry (push) Has been cancelled
Publish to Comfy registry / Publish Custom Node to registry (push) Has been cancelled
Click a node name (or 'Draw this node') in the mirror-search palette to render an imitation ComfyUI node box with its real input sockets, widget defaults, and output sockets. Since disabled packs aren't loaded (no /object_info entry), a new read-only backend module (node_introspect.py) AST-parses the pack on disk — never importing or executing it — to recover INPUT_TYPES / RETURN_TYPES from a literal NODE_CLASS_MAPPINGS. New GET /nodes-stats/node-schema endpoint resolves the disabled pack dir (handling @version suffixes / case) and returns the schema off the event loop. Frontend lazily fetches + caches per node, renders sockets vs widgets with type-colored dots, and falls back to a placeholder for packs that build their node list dynamically. End-to-end against the live install: 67/68 disabled packs resolve on disk, ~92% of nodes render a real box, the rest fall back cleanly. Adds 7 parser unit tests (36 total green). Bump to 1.6.0.
This commit is contained in:
@@ -0,0 +1,287 @@
|
||||
"""Static (no-execution) introspection of disabled custom-node packages.
|
||||
|
||||
The mirror-search palette previews nodes that belong to *disabled* packs. Those
|
||||
packs aren't imported by ComfyUI, so their INPUT_TYPES / RETURN_TYPES are not in
|
||||
/object_info. To draw a faithful node box we parse the pack's Python source with
|
||||
``ast`` — we never import or execute it (importing a disabled pack could have
|
||||
side effects, pull heavy deps, or fail). This yields real inputs/outputs for the
|
||||
~75% of packs that declare a literal ``NODE_CLASS_MAPPINGS``; packs that build
|
||||
their mappings dynamically simply report ``parseable: False`` and the frontend
|
||||
falls back to a placeholder box.
|
||||
"""
|
||||
|
||||
import ast
|
||||
import logging
|
||||
import os
|
||||
import warnings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Input "types" that ComfyUI renders as in-node widgets rather than sockets.
|
||||
_WIDGET_TYPES = {"INT", "FLOAT", "STRING", "BOOLEAN", "BOOL"}
|
||||
|
||||
# Cache parsed pack indexes for the session, keyed by source path. Disabled
|
||||
# packs don't change while ComfyUI runs, so we never invalidate.
|
||||
_INDEX_CACHE = {}
|
||||
|
||||
|
||||
def _const_str(node):
|
||||
if isinstance(node, ast.Constant) and isinstance(node.value, str):
|
||||
return node.value
|
||||
return None
|
||||
|
||||
|
||||
def _parse_file(path):
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8", errors="ignore") as fh:
|
||||
src = fh.read()
|
||||
# Third-party sources often contain unescaped regex strings; ast.parse
|
||||
# emits SyntaxWarning for those. Suppress to keep server logs clean.
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore")
|
||||
return ast.parse(src)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _iter_py_files(root, limit=500):
|
||||
"""Yield up to ``limit`` .py files under a pack dir (or the file itself)."""
|
||||
if os.path.isfile(root):
|
||||
if root.endswith(".py"):
|
||||
yield root
|
||||
return
|
||||
count = 0
|
||||
for dirpath, dirnames, filenames in os.walk(root):
|
||||
dirnames[:] = [
|
||||
d for d in dirnames
|
||||
if not d.startswith(".") and d not in ("__pycache__", "node_modules", "js", "web", "dist")
|
||||
]
|
||||
for fn in filenames:
|
||||
if fn.endswith(".py"):
|
||||
yield os.path.join(dirpath, fn)
|
||||
count += 1
|
||||
if count >= limit:
|
||||
return
|
||||
|
||||
|
||||
def _string_tuple(node):
|
||||
"""A RETURN_TYPES / RETURN_NAMES value -> list of strings ("*" for non-literal)."""
|
||||
if isinstance(node, (ast.Tuple, ast.List)):
|
||||
out = []
|
||||
for e in node.elts:
|
||||
s = _const_str(e)
|
||||
out.append(s if s is not None else "*")
|
||||
return out
|
||||
s = _const_str(node)
|
||||
return [s] if s is not None else None
|
||||
|
||||
|
||||
def _extract_input_def(name, val):
|
||||
"""One INPUT_TYPES entry -> {name, type, widget, default, options}."""
|
||||
d = {"name": name, "type": "*", "widget": False, "default": None, "options": None}
|
||||
type_node, opts_node = None, None
|
||||
if isinstance(val, (ast.Tuple, ast.List)) and val.elts:
|
||||
type_node = val.elts[0]
|
||||
if len(val.elts) > 1:
|
||||
opts_node = val.elts[1]
|
||||
else:
|
||||
type_node = val
|
||||
|
||||
s = _const_str(type_node)
|
||||
if s is not None:
|
||||
d["type"] = s
|
||||
if s.upper() in _WIDGET_TYPES:
|
||||
d["widget"] = True
|
||||
elif isinstance(type_node, (ast.List, ast.Tuple)):
|
||||
# Inline combo box: ["a", "b", ...]
|
||||
opts = [_const_str(e) for e in type_node.elts]
|
||||
opts = [o for o in opts if o is not None]
|
||||
d["type"] = "COMBO"
|
||||
d["widget"] = True
|
||||
d["options"] = opts or None
|
||||
elif isinstance(type_node, ast.Call):
|
||||
# Dynamic list, e.g. folder_paths.get_filename_list(...) -> dropdown widget.
|
||||
d["type"] = "COMBO"
|
||||
d["widget"] = True
|
||||
# else: a Name/Attribute (custom or wildcard socket type) -> keep "*", socket.
|
||||
|
||||
if isinstance(opts_node, ast.Dict):
|
||||
for ok, ov in zip(opts_node.keys, opts_node.values):
|
||||
if _const_str(ok) == "default":
|
||||
try:
|
||||
d["default"] = ast.literal_eval(ov)
|
||||
except Exception:
|
||||
d["default"] = _const_str(ov)
|
||||
return d
|
||||
|
||||
|
||||
def _extract_input_types(fn):
|
||||
"""The INPUT_TYPES classmethod -> {"required": [...], "optional": [...]} or None."""
|
||||
ret = None
|
||||
for n in ast.walk(fn):
|
||||
if isinstance(n, ast.Return) and isinstance(n.value, ast.Dict):
|
||||
ret = n.value
|
||||
break
|
||||
if ret is None:
|
||||
return None
|
||||
result = {"required": [], "optional": []}
|
||||
for cat_key, cat_val in zip(ret.keys, ret.values):
|
||||
cat = _const_str(cat_key)
|
||||
if cat not in ("required", "optional") or not isinstance(cat_val, ast.Dict):
|
||||
continue
|
||||
for nk, nv in zip(cat_val.keys, cat_val.values):
|
||||
name = _const_str(nk)
|
||||
if name is not None:
|
||||
result[cat].append(_extract_input_def(name, nv))
|
||||
return result
|
||||
|
||||
|
||||
def _extract_class(cls):
|
||||
info = {"input_types": None, "return_types": None, "return_names": None,
|
||||
"category": None, "output_node": False}
|
||||
for b in cls.body:
|
||||
if isinstance(b, (ast.FunctionDef, ast.AsyncFunctionDef)) and b.name == "INPUT_TYPES":
|
||||
info["input_types"] = _extract_input_types(b)
|
||||
elif isinstance(b, ast.Assign):
|
||||
for t in b.targets:
|
||||
tn = getattr(t, "id", None)
|
||||
if tn == "RETURN_TYPES":
|
||||
info["return_types"] = _string_tuple(b.value)
|
||||
elif tn == "RETURN_NAMES":
|
||||
info["return_names"] = _string_tuple(b.value)
|
||||
elif tn == "CATEGORY":
|
||||
info["category"] = _const_str(b.value)
|
||||
elif tn == "OUTPUT_NODE" and isinstance(b.value, ast.Constant):
|
||||
info["output_node"] = bool(b.value.value)
|
||||
return info
|
||||
|
||||
|
||||
def _merge_mapping(out, dictnode):
|
||||
if not isinstance(dictnode, ast.Dict):
|
||||
return
|
||||
for k, v in zip(dictnode.keys, dictnode.values):
|
||||
key = _const_str(k)
|
||||
if key is None:
|
||||
continue
|
||||
if isinstance(v, ast.Name):
|
||||
out[key] = v.id
|
||||
elif isinstance(v, ast.Call) and isinstance(v.func, ast.Name):
|
||||
out[key] = v.func.id
|
||||
|
||||
|
||||
def _merge_display(out, dictnode):
|
||||
if not isinstance(dictnode, ast.Dict):
|
||||
return
|
||||
for k, v in zip(dictnode.keys, dictnode.values):
|
||||
key, val = _const_str(k), _const_str(v)
|
||||
if key is not None and val is not None:
|
||||
out[key] = val
|
||||
|
||||
|
||||
def build_pack_index(pack_path):
|
||||
"""Parse a pack -> (classes, mappings, display).
|
||||
|
||||
classes: { ClassName: {input_types, return_types, return_names, category, output_node} }
|
||||
mappings: { node_key: ClassName } (from literal NODE_CLASS_MAPPINGS / .update)
|
||||
display: { node_key: "Pretty Name" }
|
||||
"""
|
||||
cached = _INDEX_CACHE.get(pack_path)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
classes, mappings, display = {}, {}, {}
|
||||
for f in _iter_py_files(pack_path):
|
||||
tree = _parse_file(f)
|
||||
if tree is None:
|
||||
continue
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
classes.setdefault(node.name, _extract_class(node))
|
||||
elif isinstance(node, ast.Assign):
|
||||
for t in node.targets:
|
||||
name = getattr(t, "id", None)
|
||||
if name == "NODE_CLASS_MAPPINGS":
|
||||
_merge_mapping(mappings, node.value)
|
||||
elif name == "NODE_DISPLAY_NAME_MAPPINGS":
|
||||
_merge_display(display, node.value)
|
||||
elif isinstance(node, ast.Call):
|
||||
fn = node.func
|
||||
if (isinstance(fn, ast.Attribute) and fn.attr == "update"
|
||||
and isinstance(fn.value, ast.Name) and node.args
|
||||
and isinstance(node.args[0], ast.Dict)):
|
||||
if fn.value.id == "NODE_CLASS_MAPPINGS":
|
||||
_merge_mapping(mappings, node.args[0])
|
||||
elif fn.value.id == "NODE_DISPLAY_NAME_MAPPINGS":
|
||||
_merge_display(display, node.args[0])
|
||||
|
||||
result = (classes, mappings, display)
|
||||
_INDEX_CACHE[pack_path] = result
|
||||
return result
|
||||
|
||||
|
||||
def get_node_schema(class_type, pack_path):
|
||||
"""Return a render-ready schema for one node, or {parseable: False, reason}."""
|
||||
classes, mappings, display = build_pack_index(pack_path)
|
||||
|
||||
cls_name = mappings.get(class_type)
|
||||
if cls_name is None and class_type in classes:
|
||||
cls_name = class_type # fall back: class_type IS the class name
|
||||
if cls_name is None or cls_name not in classes:
|
||||
return {"parseable": False, "reason": "dynamic_mapping" if mappings else "no_mapping"}
|
||||
|
||||
info = classes[cls_name]
|
||||
inputs = []
|
||||
it = info["input_types"]
|
||||
if it:
|
||||
for cat in ("required", "optional"):
|
||||
for d in it[cat]:
|
||||
inputs.append({**d, "required": cat == "required"})
|
||||
|
||||
rt = info["return_types"] or []
|
||||
rn = info["return_names"] or []
|
||||
outputs = []
|
||||
for i, t in enumerate(rt):
|
||||
nm = rn[i] if i < len(rn) and rn[i] else t
|
||||
outputs.append({"name": nm or t, "type": t})
|
||||
|
||||
return {
|
||||
"parseable": True,
|
||||
"class_type": class_type,
|
||||
"display_name": display.get(class_type) or class_type,
|
||||
"category": info["category"],
|
||||
"output_node": info["output_node"],
|
||||
"inputs": inputs,
|
||||
"outputs": outputs,
|
||||
}
|
||||
|
||||
|
||||
def find_disabled_pack_path(pack_name):
|
||||
"""Locate a disabled pack's source under any custom_nodes/.disabled/ dir.
|
||||
|
||||
Matches case-insensitively and ignores any ``@version`` suffix that ComfyUI
|
||||
Manager appends on disk (e.g. ``ComfyMath@nightly`` for pack ``comfymath``).
|
||||
Returns an absolute path (dir or .py file) or None. Rejects path-y input.
|
||||
"""
|
||||
if not pack_name or any(c in pack_name for c in ("/", "\\")) or ".." in pack_name:
|
||||
return None
|
||||
try:
|
||||
import folder_paths
|
||||
roots = folder_paths.get_folder_paths("custom_nodes")
|
||||
except Exception:
|
||||
roots = []
|
||||
|
||||
target = pack_name.lower()
|
||||
for root in roots:
|
||||
ddir = os.path.join(root, ".disabled")
|
||||
if not os.path.isdir(ddir):
|
||||
continue
|
||||
try:
|
||||
entries = os.listdir(ddir)
|
||||
except Exception:
|
||||
continue
|
||||
for e in entries:
|
||||
base = e.split("@", 1)[0]
|
||||
stem = base[:-3] if base.endswith(".py") else base
|
||||
if target in (stem.lower(), base.lower()):
|
||||
return os.path.join(ddir, e)
|
||||
return None
|
||||
Reference in New Issue
Block a user