Files

620 lines
23 KiB
Python

"""
UTFCN — Use The F***ing Core Nodes. Backend analysis engine.
This module runs inside the ComfyUI server process, so it can see the live node
registry (``nodes.NODE_CLASS_MAPPINGS``) with every node's real INPUT_TYPES /
RETURN_TYPES and its source module. That's exactly the ground truth needed to
answer the only interesting question here:
"This custom node — is there a CORE node (or, failing that, a node from a
DIFFERENT installed pack) that does the same job, and could I swap it in
without breaking the graph?"
We answer it in three tiers, from most to least trustworthy:
curated a hand-written rule in mappings.json / user_mappings.json.
Carries explicit input/widget/output name remaps. Verified.
exact the candidate's signature (input name→type map + ordered output
types) is IDENTICAL to the source's. Safe to remap by name.
Verified.
partial the candidate can structurally accept every input the source has,
provides every output type the source has, and matches the same
feature intent. A *suggestion* only — never auto-applied.
The frontend consumes the result: `verified` candidates power auto-replace,
`partial` ones are shown for the user to confirm.
"""
import json
import os
import re
from collections import Counter, defaultdict
# Top-level python modules we consider "core" (shipped with ComfyUI itself).
# server.py exposes each class's origin as RELATIVE_PYTHON_MODULE (default "nodes").
CORE_TOPLEVEL = ("nodes", "comfy_extras", "comfy_api_nodes", "comfy_api")
# Widget-ish primitive types. These are values the user types, not graph links,
# so they matter for widget-value transfer but not for link compatibility.
WIDGET_TYPES = frozenset({"INT", "FLOAT", "STRING", "BOOLEAN", "COMBO"})
_TEXT_TYPES = frozenset({"STRING", "STRING_LIST"})
_TEXT_NEUTRAL_TOKENS = frozenset(
{
"any",
"box",
"constant",
"input",
"literal",
"multi",
"multiline",
"note",
"primitive",
"prompt",
"string",
"text",
"textarea",
"value",
"widget",
}
)
_ACTION_GROUPS = (
("blur", frozenset({"blur", "smooth"})),
("crop", frozenset({"crop"})),
("geometry", frozenset({"downscale", "resize", "rescale", "scale", "upscale"})),
("invert", frozenset({"invert", "inversion"})),
("passthrough", frozenset({"identity", "pass", "passthrough", "reroute"})),
("preview", frozenset({"display", "preview", "show", "view"})),
("size", frozenset({"dimension", "dimensions", "height", "resolution", "size", "width"})),
("concat", frozenset({"append", "combine", "concat", "concatenate", "join", "merge"})),
("convert", frozenset({"cast", "convert", "float", "int", "number"})),
("encode", frozenset({"clip", "conditioning", "encode", "encoder", "tokenize", "tokenizer"})),
("extract", frozenset({"extract", "find", "parse", "regex", "regexp", "select"})),
("format", frozenset({"format", "template"})),
("io", frozenset({"file", "load", "path", "read", "save", "url", "write"})),
("replace", frozenset({"remove", "replace", "substitute"})),
("split", frozenset({"separate", "split", "splitter"})),
("strip", frozenset({"clean", "lstrip", "rstrip", "sanitize", "strip", "trim"})),
("translate", frozenset({"translate", "translator"})),
("truncate", frozenset({"chop", "slice", "substring", "truncate"})),
("case", frozenset({"case", "lower", "upper"})),
)
def _module_of(cls):
return getattr(cls, "RELATIVE_PYTHON_MODULE", "nodes") or "nodes"
def _source_kind(module):
top = module.split(".", 1)[0]
if top == "custom_nodes":
return "custom"
if top in CORE_TOPLEVEL:
return "core"
return "core" # anything unexpected is treated as first-party
def _pack_of(module):
parts = module.split(".")
if parts[0] == "custom_nodes" and len(parts) > 1:
return parts[1]
return parts[0]
def _spec_type(spec):
"""Reduce an INPUT_TYPES spec (``("IMAGE",)`` / ``(["a","b"], {...})``) to a type string."""
t = spec[0] if isinstance(spec, (list, tuple)) and spec else spec
if isinstance(t, list): # a list of choices == a combo/dropdown widget
return "COMBO"
return str(t)
def _signature(cls):
"""Extract a comparable signature: inputs {name->type}, required names, ordered output types."""
try:
it = cls.INPUT_TYPES()
except Exception:
it = {}
inputs, required = {}, set()
for section in ("required", "optional"):
for name, spec in (it.get(section) or {}).items():
try:
inputs[name] = _spec_type(spec)
except Exception:
inputs[name] = "*"
if section == "required":
required.add(name)
outputs = [str(t) for t in (getattr(cls, "RETURN_TYPES", ()) or ())]
out_names = [str(n) for n in (getattr(cls, "RETURN_NAMES", ()) or [])]
return {"inputs": inputs, "required": required, "outputs": outputs, "output_names": out_names}
def _first_output_type(sig):
return sig["outputs"][0] if sig["outputs"] else ""
def _is_exact(a, b):
"""Identical enough that a name-based remap is trivially safe."""
return a["inputs"] == b["inputs"] and a["outputs"] == b["outputs"]
def _feasible(src, cand):
"""Can `cand` structurally stand in for `src`? (accepts all its inputs, provides all its outputs)"""
src_in = Counter(src["inputs"].values())
cand_in = Counter(cand["inputs"].values())
in_ok = not (src_in - cand_in) # every source input type available on candidate
src_out = Counter(src["outputs"])
cand_out = Counter(cand["outputs"])
out_ok = not (src_out - cand_out) # candidate provides every source output type
return in_ok and out_ok
def _score(src, cand):
"""Signature-overlap score in [0,1]; higher = more alike. Rewards matching names too."""
src_in, cand_in = Counter(src["inputs"].values()), Counter(cand["inputs"].values())
src_out, cand_out = Counter(src["outputs"]), Counter(cand["outputs"])
overlap = sum((src_in & cand_in).values()) + sum((src_out & cand_out).values())
total = sum(src_in.values()) + sum(src_out.values())
base = overlap / total if total else 0.0
# small bonus for shared input names — a strong signal of a deliberate re-implementation
shared_names = len(set(src["inputs"]) & set(cand["inputs"]))
name_bonus = 0.15 * (shared_names / len(src["inputs"])) if src["inputs"] else 0.0
return min(1.0, base + name_bonus)
def _semantic_tokens(*parts):
text = " ".join(str(part or "") for part in parts)
text = re.sub(r"([a-z0-9])([A-Z])", r"\1 \2", text)
text = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1 \2", text)
return {
token
for token in re.split(r"[^A-Za-z0-9]+", text.lower())
if token
}
def _identity_tokens(name, meta, sig):
if not isinstance(meta, dict):
meta = {}
terms = [name, meta.get("display")]
terms.extend(sig.get("inputs", {}).keys())
terms.extend(sig.get("output_names") or [])
return _semantic_tokens(*terms)
def _action_groups(tokens):
groups = {
group
for group, group_tokens in _ACTION_GROUPS
if tokens & group_tokens
}
if "to" in tokens and tokens & {"bool", "boolean", "float", "int", "number"}:
groups.add("convert")
return groups
def _text_signature_kind(sig):
values = set(sig.get("inputs", {}).values()) | set(sig.get("outputs", []))
return bool(values & _TEXT_TYPES)
def _text_value_like(tokens, sig):
outputs = sig.get("outputs", [])
inputs = sig.get("inputs", {})
if not outputs or not set(outputs) <= _TEXT_TYPES:
return False
if _action_groups(tokens):
return False
if len(inputs) > 1:
return False
if inputs:
name, typ = next(iter(inputs.items()))
if typ not in _TEXT_TYPES and typ != "COMBO":
return False
if not (_semantic_tokens(name) & _TEXT_NEUTRAL_TOKENS):
return False
return bool(tokens & _TEXT_NEUTRAL_TOKENS)
def _features_compatible(src_name, src_sig, src_meta, cand_name, cand_sig, cand_meta):
"""
Structural compatibility is too weak for primitive text nodes: a missing
text box serializes as STRING output only, which otherwise matches every
STRING utility. Gate text candidates by identity tokens so text-entry
sources do not suggest transforms such as truncate/split/replace.
"""
src_tokens = _identity_tokens(src_name, src_meta, src_sig)
cand_tokens = _identity_tokens(cand_name, cand_meta, cand_sig)
src_actions = _action_groups(src_tokens)
cand_actions = _action_groups(cand_tokens)
if _text_signature_kind(src_sig) and _text_signature_kind(cand_sig) and _text_value_like(src_tokens, src_sig):
return not cand_actions and _text_value_like(cand_tokens, cand_sig)
if src_actions or cand_actions:
return bool(src_actions & cand_actions)
return True
# score below which a partial match isn't worth surfacing
_PARTIAL_THRESHOLD = 0.5
# max candidates returned per source node
_MAX_CANDIDATES = 6
_GENERATED_SCHEMA_VERSION = 1
_GENERATED_SIGNATURES_FILE = "popular_node_signatures.json"
def _empty_generated_signatures():
return {"sigs": {}, "meta": {}, "by_out": defaultdict(list)}
def _normalise_generated_signature(node_type, entry):
if not isinstance(entry, dict):
return None
if str(entry.get("confidence") or "") == "metadata_only":
return None
inputs_raw = entry.get("inputs") or {}
if not isinstance(inputs_raw, dict):
return None
outputs_raw = entry.get("outputs") or []
if not isinstance(outputs_raw, list):
return None
inputs = {str(k): str(v) for k, v in inputs_raw.items() if k is not None}
outputs = [str(v) for v in outputs_raw if v is not None]
if not inputs and not outputs:
return None
required_raw = entry.get("required") or []
if not isinstance(required_raw, list):
required_raw = []
output_names_raw = entry.get("output_names") or []
if not isinstance(output_names_raw, list):
output_names_raw = []
sig = {
"inputs": inputs,
"required": {str(v) for v in required_raw if str(v) in inputs},
"outputs": outputs,
"output_names": [str(v) for v in output_names_raw],
}
meta = {
"source": "generated",
"pack": str(entry.get("pack") or ""),
"display": str(entry.get("display") or entry.get("type") or node_type),
"repository": str(entry.get("repository") or ""),
"confidence": str(entry.get("confidence") or ""),
}
return sig, meta
def load_generated_signatures(base_dir):
path = os.path.join(base_dir, _GENERATED_SIGNATURES_FILE)
generated = _empty_generated_signatures()
if not os.path.isfile(path):
return generated
try:
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
except Exception as e:
print(f"[UTFCN] failed to read {_GENERATED_SIGNATURES_FILE}: {e}")
return generated
if not isinstance(raw, dict) or raw.get("schema_version") != _GENERATED_SCHEMA_VERSION:
print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: unsupported schema")
return generated
nodes = raw.get("nodes") or {}
if not isinstance(nodes, dict):
print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: nodes must be an object")
return generated
for node_type, entry in nodes.items():
normalised = _normalise_generated_signature(str(node_type), entry)
if normalised is None:
continue
sig, meta = normalised
generated["sigs"][str(node_type)] = sig
generated["meta"][str(node_type)] = meta
generated["by_out"][_first_output_type(sig)].append(str(node_type))
return generated
def _normalise_rules(raw):
"""Accept both {source: {...single...}} and {source: [ {...}, {...} ]} shapes."""
out = {}
for src, val in (raw.get("rules") or {}).items():
targets = val if isinstance(val, list) else [val]
out[src] = [t for t in targets if isinstance(t, dict) and t.get("to")]
return out
def load_rules(base_dir):
"""Load builtin mappings.json, then deep-merge user_mappings.json on top (user wins per source)."""
merged = {}
for fname in ("mappings.json", "user_mappings.json"):
path = os.path.join(base_dir, fname)
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
merged.update(_normalise_rules(json.load(f)))
except Exception as e: # a broken user file must never take the server down
print(f"[UTFCN] failed to read {fname}: {e}")
return merged
def build_context(rules, generated=None):
"""
Snapshot the live node registry once (signatures + source of every node).
Returned context is reused by build_index() (the /utfcn/scan payload) and by
match() (per-workflow matching of UNINSTALLED nodes), so the expensive walk
only happens on refresh.
`rules` is the merged curated mapping: {sourceType: [ {to, note, inputs, widgets, outputs}, ... ]}.
"""
import nodes # imported here so the module stays importable outside ComfyUI
classes = nodes.NODE_CLASS_MAPPINGS
displays = getattr(nodes, "NODE_DISPLAY_NAME_MAPPINGS", {})
sources, sigs = {}, {}
for name, cls in classes.items():
module = _module_of(cls)
sources[name] = {"source": _source_kind(module), "pack": _pack_of(module), "display": displays.get(name, name)}
sigs[name] = _signature(cls)
# Bucket every potential *target* by its first output type so a source only
# gets compared against nodes that could plausibly feed the same downstream.
by_out = defaultdict(list)
for name in classes:
by_out[_first_output_type(sigs[name])].append(name)
return {
"sources": sources,
"sigs": sigs,
"by_out": by_out,
"rules": rules,
"generated": generated or _empty_generated_signatures(),
}
def _candidates_for(src_name, src_sig, src_pack, ctx, src_meta=None):
"""
Rank replacement candidates for one source node.
`src_sig` may be None (an uninstalled node we know only by name) — then only
curated rules apply. If a signature is given (installed node, or a missing
node's serialized signature), exact/partial tiers are added too.
`src_pack` is None for uninstalled/unknown sources (skips same-pack exclusion).
"""
sources, sigs, by_out, rules = ctx["sources"], ctx["sigs"], ctx["by_out"], ctx["rules"]
if not isinstance(src_meta, dict):
src_meta = sources.get(src_name, {})
found, seen = [], set()
# --- tier 1: curated rules (ordered preference; core-first is the author's job) ---
for rule in rules.get(src_name, []):
to = rule.get("to")
if not to or to == src_name or to not in sources or to in seen:
continue
seen.add(to)
found.append(_candidate(to, sources, "curated", 1.0, rule))
# --- tiers 2 & 3: signature matching within the same output bucket ---
if src_sig is not None:
ranked = []
for cand_name in by_out.get(_first_output_type(src_sig), []):
if cand_name in seen or cand_name == src_name:
continue
cand_meta = sources[cand_name]
# target must be core, or a DIFFERENT installed pack (fallback-to-available)
if cand_meta["source"] == "custom" and src_pack is not None and cand_meta["pack"] == src_pack:
continue
cand_sig = sigs[cand_name]
if not _feasible(src_sig, cand_sig):
continue
if not _features_compatible(src_name, src_sig, src_meta, cand_name, cand_sig, cand_meta):
continue
if _is_exact(src_sig, cand_sig):
ranked.append((cand_name, "exact", 1.0))
else:
sc = _score(src_sig, cand_sig)
if sc >= _PARTIAL_THRESHOLD:
ranked.append((cand_name, "partial", sc))
ranked.sort(key=lambda r: (
0 if sources[r[0]]["source"] == "core" else 1, # core before pack
0 if r[1] == "exact" else 1, # exact before partial
-r[2], # higher score first
))
for cand_name, tier, sc in ranked:
if cand_name in seen:
continue
seen.add(cand_name)
found.append(_candidate(cand_name, sources, tier, sc, None))
return found[:_MAX_CANDIDATES]
def build_index(ctx):
"""
Build the /utfcn/scan payload from a context.
Covers INSTALLED custom nodes (curated + signature tiers) AND uninstalled
source types that a curated rule targets an installed node for — so a rule
still fires on a node whose pack you never installed.
Returns { "sources": {...}, "candidates": {srcType: [candidate,...]}, "stats": {...} }.
"""
sources = ctx["sources"]
candidates = {}
for src_name, meta in sources.items():
if meta["source"] != "custom":
continue
found = _candidates_for(src_name, ctx["sigs"][src_name], meta["pack"], ctx)
if found:
candidates[src_name] = found
# curated rules whose SOURCE isn't installed (the "replace a missing node
# without installing its pack" case) — no signature, so curated-only.
uninstalled = 0
for src_name in ctx["rules"]:
if src_name in sources or src_name in candidates:
continue
found = _candidates_for(src_name, None, None, ctx)
if found:
candidates[src_name] = found
uninstalled += 1
stats = {
"nodes": len(sources),
"custom": sum(1 for m in sources.values() if m["source"] == "custom"),
"replaceable": len(candidates),
"verified": sum(1 for cl in candidates.values() if any(c["verified"] for c in cl)),
"uninstalled": uninstalled,
}
return {"sources": sources, "candidates": candidates, "stats": stats}
def _signature_from_item(it):
inputs_raw = it.get("inputs") or {}
if not isinstance(inputs_raw, dict):
inputs_raw = {}
outputs_raw = it.get("outputs") or []
if not isinstance(outputs_raw, list):
outputs_raw = []
output_names_raw = it.get("output_names") or []
if not isinstance(output_names_raw, list):
output_names_raw = []
inputs = {str(k): str(v) for k, v in inputs_raw.items() if k is not None}
return {
"inputs": inputs,
"required": set(inputs),
"outputs": [str(x) for x in outputs_raw],
"output_names": [str(x) for x in output_names_raw],
}
def _generated_signature_usable(sig):
return isinstance(sig, dict) and isinstance(sig.get("inputs"), dict) and isinstance(sig.get("outputs"), list)
def _normalised_generated_signature(sig):
if not _generated_signature_usable(sig):
return None
try:
inputs = {str(k): str(v) for k, v in sig["inputs"].items() if k is not None}
outputs = [str(x) for x in sig["outputs"]]
required_raw = sig.get("required") or []
if not isinstance(required_raw, (list, set, tuple)):
required_raw = []
output_names_raw = sig.get("output_names") or []
if not isinstance(output_names_raw, list):
output_names_raw = []
return {
"inputs": inputs,
"required": {str(v) for v in required_raw if str(v) in inputs},
"outputs": outputs,
"output_names": [str(x) for x in output_names_raw],
}
except Exception:
return None
def _generated_signature_conflicts(serialized_sig, generated_sig):
if not serialized_sig["inputs"] and not serialized_sig["outputs"]:
return False
generated_inputs = generated_sig["inputs"]
for name, typ in serialized_sig["inputs"].items():
if name in generated_inputs:
if generated_inputs[name] != typ:
return True
else:
return True
if Counter(serialized_sig["outputs"]) - Counter(generated_sig["outputs"]):
return True
return False
def match(ctx, items):
"""
Match a batch of nodes given only their (possibly serialized) signature —
used for UNINSTALLED / missing nodes in an open workflow.
`items`: [ {"type": str, "inputs": {name: TYPE}, "outputs": [TYPE], "output_names": [..]} ].
Serialized nodes only carry link slots (not widget values), so 'exact' rarely
fires; curated rules (by type name), bundled generated signatures, and
feature-gated partial link-type matches do.
Returns a mapping from source node type to candidate list.
"""
out = {}
generated = ctx.get("generated") or {}
if not isinstance(generated, dict):
generated = {}
generated_sigs = generated.get("sigs") or {}
if not isinstance(generated_sigs, dict):
generated_sigs = {}
generated_meta = generated.get("meta") or {}
if not isinstance(generated_meta, dict):
generated_meta = {}
for it in items:
if not isinstance(it, dict):
continue
t = it.get("type")
if not t or t in out:
continue
sig = _signature_from_item(it)
gen_sig = _normalised_generated_signature(generated_sigs.get(t))
if gen_sig is not None and not _generated_signature_conflicts(sig, gen_sig):
gen_meta = generated_meta.get(t) or {}
if not isinstance(gen_meta, dict):
gen_meta = {}
gen_pack = gen_meta.get("pack")
found = _candidates_for(t, gen_sig, gen_pack, ctx, gen_meta)
if found:
out[t] = found
continue
item_meta = {"display": it.get("display") or t}
found = _candidates_for(t, sig, None, ctx, item_meta)
if found:
out[t] = found
return out
def _candidate(to, sources, tier, score, rule):
meta = sources[to]
cand = {
"to": to,
"to_display": meta["display"],
"source": meta["source"], # "core" | "custom"
"pack": meta["pack"],
"tier": tier, # "curated" | "exact" | "partial"
"verified": tier in ("curated", "exact"),
"score": round(float(score), 3),
}
if rule:
# explicit name remaps travel to the frontend so the swap is exact
for key in ("inputs", "widgets", "outputs"):
if isinstance(rule.get(key), dict):
cand[key] = rule[key]
if rule.get("note"):
cand["note"] = rule["note"]
return cand