620 lines
23 KiB
Python
620 lines
23 KiB
Python
"""
|
|
UTFCN — Use The F***ing Core Nodes. Backend analysis engine.
|
|
|
|
This module runs inside the ComfyUI server process, so it can see the live node
|
|
registry (``nodes.NODE_CLASS_MAPPINGS``) with every node's real INPUT_TYPES /
|
|
RETURN_TYPES and its source module. That's exactly the ground truth needed to
|
|
answer the only interesting question here:
|
|
|
|
"This custom node — is there a CORE node (or, failing that, a node from a
|
|
DIFFERENT installed pack) that does the same job, and could I swap it in
|
|
without breaking the graph?"
|
|
|
|
We answer it in three tiers, from most to least trustworthy:
|
|
|
|
curated a hand-written rule in mappings.json / user_mappings.json.
|
|
Carries explicit input/widget/output name remaps. Verified.
|
|
exact the candidate's signature (input name→type map + ordered output
|
|
types) is IDENTICAL to the source's. Safe to remap by name.
|
|
Verified.
|
|
partial the candidate can structurally accept every input the source has,
|
|
provides every output type the source has, and matches the same
|
|
feature intent. A *suggestion* only — never auto-applied.
|
|
|
|
The frontend consumes the result: `verified` candidates power auto-replace,
|
|
`partial` ones are shown for the user to confirm.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from collections import Counter, defaultdict
|
|
|
|
# Top-level python modules we consider "core" (shipped with ComfyUI itself).
|
|
# server.py exposes each class's origin as RELATIVE_PYTHON_MODULE (default "nodes").
|
|
CORE_TOPLEVEL = ("nodes", "comfy_extras", "comfy_api_nodes", "comfy_api")
|
|
|
|
# Widget-ish primitive types. These are values the user types, not graph links,
|
|
# so they matter for widget-value transfer but not for link compatibility.
|
|
WIDGET_TYPES = frozenset({"INT", "FLOAT", "STRING", "BOOLEAN", "COMBO"})
|
|
|
|
_TEXT_TYPES = frozenset({"STRING", "STRING_LIST"})
|
|
_TEXT_NEUTRAL_TOKENS = frozenset(
|
|
{
|
|
"any",
|
|
"box",
|
|
"constant",
|
|
"input",
|
|
"literal",
|
|
"multi",
|
|
"multiline",
|
|
"note",
|
|
"primitive",
|
|
"prompt",
|
|
"string",
|
|
"text",
|
|
"textarea",
|
|
"value",
|
|
"widget",
|
|
}
|
|
)
|
|
_ACTION_GROUPS = (
|
|
("blur", frozenset({"blur", "smooth"})),
|
|
("crop", frozenset({"crop"})),
|
|
("geometry", frozenset({"downscale", "resize", "rescale", "scale", "upscale"})),
|
|
("invert", frozenset({"invert", "inversion"})),
|
|
("passthrough", frozenset({"identity", "pass", "passthrough", "reroute"})),
|
|
("preview", frozenset({"display", "preview", "show", "view"})),
|
|
("size", frozenset({"dimension", "dimensions", "height", "resolution", "size", "width"})),
|
|
("concat", frozenset({"append", "combine", "concat", "concatenate", "join", "merge"})),
|
|
("convert", frozenset({"cast", "convert", "float", "int", "number"})),
|
|
("encode", frozenset({"clip", "conditioning", "encode", "encoder", "tokenize", "tokenizer"})),
|
|
("extract", frozenset({"extract", "find", "parse", "regex", "regexp", "select"})),
|
|
("format", frozenset({"format", "template"})),
|
|
("io", frozenset({"file", "load", "path", "read", "save", "url", "write"})),
|
|
("replace", frozenset({"remove", "replace", "substitute"})),
|
|
("split", frozenset({"separate", "split", "splitter"})),
|
|
("strip", frozenset({"clean", "lstrip", "rstrip", "sanitize", "strip", "trim"})),
|
|
("translate", frozenset({"translate", "translator"})),
|
|
("truncate", frozenset({"chop", "slice", "substring", "truncate"})),
|
|
("case", frozenset({"case", "lower", "upper"})),
|
|
)
|
|
|
|
|
|
def _module_of(cls):
|
|
return getattr(cls, "RELATIVE_PYTHON_MODULE", "nodes") or "nodes"
|
|
|
|
|
|
def _source_kind(module):
|
|
top = module.split(".", 1)[0]
|
|
if top == "custom_nodes":
|
|
return "custom"
|
|
if top in CORE_TOPLEVEL:
|
|
return "core"
|
|
return "core" # anything unexpected is treated as first-party
|
|
|
|
|
|
def _pack_of(module):
|
|
parts = module.split(".")
|
|
if parts[0] == "custom_nodes" and len(parts) > 1:
|
|
return parts[1]
|
|
return parts[0]
|
|
|
|
|
|
def _spec_type(spec):
|
|
"""Reduce an INPUT_TYPES spec (``("IMAGE",)`` / ``(["a","b"], {...})``) to a type string."""
|
|
t = spec[0] if isinstance(spec, (list, tuple)) and spec else spec
|
|
if isinstance(t, list): # a list of choices == a combo/dropdown widget
|
|
return "COMBO"
|
|
return str(t)
|
|
|
|
|
|
def _signature(cls):
|
|
"""Extract a comparable signature: inputs {name->type}, required names, ordered output types."""
|
|
try:
|
|
it = cls.INPUT_TYPES()
|
|
except Exception:
|
|
it = {}
|
|
inputs, required = {}, set()
|
|
for section in ("required", "optional"):
|
|
for name, spec in (it.get(section) or {}).items():
|
|
try:
|
|
inputs[name] = _spec_type(spec)
|
|
except Exception:
|
|
inputs[name] = "*"
|
|
if section == "required":
|
|
required.add(name)
|
|
outputs = [str(t) for t in (getattr(cls, "RETURN_TYPES", ()) or ())]
|
|
out_names = [str(n) for n in (getattr(cls, "RETURN_NAMES", ()) or [])]
|
|
return {"inputs": inputs, "required": required, "outputs": outputs, "output_names": out_names}
|
|
|
|
|
|
def _first_output_type(sig):
|
|
return sig["outputs"][0] if sig["outputs"] else ""
|
|
|
|
|
|
def _is_exact(a, b):
|
|
"""Identical enough that a name-based remap is trivially safe."""
|
|
return a["inputs"] == b["inputs"] and a["outputs"] == b["outputs"]
|
|
|
|
|
|
def _feasible(src, cand):
|
|
"""Can `cand` structurally stand in for `src`? (accepts all its inputs, provides all its outputs)"""
|
|
src_in = Counter(src["inputs"].values())
|
|
cand_in = Counter(cand["inputs"].values())
|
|
in_ok = not (src_in - cand_in) # every source input type available on candidate
|
|
src_out = Counter(src["outputs"])
|
|
cand_out = Counter(cand["outputs"])
|
|
out_ok = not (src_out - cand_out) # candidate provides every source output type
|
|
return in_ok and out_ok
|
|
|
|
|
|
def _score(src, cand):
|
|
"""Signature-overlap score in [0,1]; higher = more alike. Rewards matching names too."""
|
|
src_in, cand_in = Counter(src["inputs"].values()), Counter(cand["inputs"].values())
|
|
src_out, cand_out = Counter(src["outputs"]), Counter(cand["outputs"])
|
|
overlap = sum((src_in & cand_in).values()) + sum((src_out & cand_out).values())
|
|
total = sum(src_in.values()) + sum(src_out.values())
|
|
base = overlap / total if total else 0.0
|
|
# small bonus for shared input names — a strong signal of a deliberate re-implementation
|
|
shared_names = len(set(src["inputs"]) & set(cand["inputs"]))
|
|
name_bonus = 0.15 * (shared_names / len(src["inputs"])) if src["inputs"] else 0.0
|
|
return min(1.0, base + name_bonus)
|
|
|
|
|
|
def _semantic_tokens(*parts):
|
|
text = " ".join(str(part or "") for part in parts)
|
|
text = re.sub(r"([a-z0-9])([A-Z])", r"\1 \2", text)
|
|
text = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1 \2", text)
|
|
return {
|
|
token
|
|
for token in re.split(r"[^A-Za-z0-9]+", text.lower())
|
|
if token
|
|
}
|
|
|
|
|
|
def _identity_tokens(name, meta, sig):
|
|
if not isinstance(meta, dict):
|
|
meta = {}
|
|
terms = [name, meta.get("display")]
|
|
terms.extend(sig.get("inputs", {}).keys())
|
|
terms.extend(sig.get("output_names") or [])
|
|
return _semantic_tokens(*terms)
|
|
|
|
|
|
def _action_groups(tokens):
|
|
groups = {
|
|
group
|
|
for group, group_tokens in _ACTION_GROUPS
|
|
if tokens & group_tokens
|
|
}
|
|
if "to" in tokens and tokens & {"bool", "boolean", "float", "int", "number"}:
|
|
groups.add("convert")
|
|
return groups
|
|
|
|
|
|
def _text_signature_kind(sig):
|
|
values = set(sig.get("inputs", {}).values()) | set(sig.get("outputs", []))
|
|
return bool(values & _TEXT_TYPES)
|
|
|
|
|
|
def _text_value_like(tokens, sig):
|
|
outputs = sig.get("outputs", [])
|
|
inputs = sig.get("inputs", {})
|
|
if not outputs or not set(outputs) <= _TEXT_TYPES:
|
|
return False
|
|
if _action_groups(tokens):
|
|
return False
|
|
if len(inputs) > 1:
|
|
return False
|
|
if inputs:
|
|
name, typ = next(iter(inputs.items()))
|
|
if typ not in _TEXT_TYPES and typ != "COMBO":
|
|
return False
|
|
if not (_semantic_tokens(name) & _TEXT_NEUTRAL_TOKENS):
|
|
return False
|
|
return bool(tokens & _TEXT_NEUTRAL_TOKENS)
|
|
|
|
|
|
def _features_compatible(src_name, src_sig, src_meta, cand_name, cand_sig, cand_meta):
|
|
"""
|
|
Structural compatibility is too weak for primitive text nodes: a missing
|
|
text box serializes as STRING output only, which otherwise matches every
|
|
STRING utility. Gate text candidates by identity tokens so text-entry
|
|
sources do not suggest transforms such as truncate/split/replace.
|
|
"""
|
|
src_tokens = _identity_tokens(src_name, src_meta, src_sig)
|
|
cand_tokens = _identity_tokens(cand_name, cand_meta, cand_sig)
|
|
src_actions = _action_groups(src_tokens)
|
|
cand_actions = _action_groups(cand_tokens)
|
|
|
|
if _text_signature_kind(src_sig) and _text_signature_kind(cand_sig) and _text_value_like(src_tokens, src_sig):
|
|
return not cand_actions and _text_value_like(cand_tokens, cand_sig)
|
|
|
|
if src_actions or cand_actions:
|
|
return bool(src_actions & cand_actions)
|
|
|
|
return True
|
|
|
|
|
|
# score below which a partial match isn't worth surfacing
|
|
_PARTIAL_THRESHOLD = 0.5
|
|
# max candidates returned per source node
|
|
_MAX_CANDIDATES = 6
|
|
|
|
_GENERATED_SCHEMA_VERSION = 1
|
|
_GENERATED_SIGNATURES_FILE = "popular_node_signatures.json"
|
|
|
|
|
|
def _empty_generated_signatures():
|
|
return {"sigs": {}, "meta": {}, "by_out": defaultdict(list)}
|
|
|
|
|
|
def _normalise_generated_signature(node_type, entry):
|
|
if not isinstance(entry, dict):
|
|
return None
|
|
if str(entry.get("confidence") or "") == "metadata_only":
|
|
return None
|
|
|
|
inputs_raw = entry.get("inputs") or {}
|
|
if not isinstance(inputs_raw, dict):
|
|
return None
|
|
outputs_raw = entry.get("outputs") or []
|
|
if not isinstance(outputs_raw, list):
|
|
return None
|
|
|
|
inputs = {str(k): str(v) for k, v in inputs_raw.items() if k is not None}
|
|
outputs = [str(v) for v in outputs_raw if v is not None]
|
|
if not inputs and not outputs:
|
|
return None
|
|
|
|
required_raw = entry.get("required") or []
|
|
if not isinstance(required_raw, list):
|
|
required_raw = []
|
|
output_names_raw = entry.get("output_names") or []
|
|
if not isinstance(output_names_raw, list):
|
|
output_names_raw = []
|
|
|
|
sig = {
|
|
"inputs": inputs,
|
|
"required": {str(v) for v in required_raw if str(v) in inputs},
|
|
"outputs": outputs,
|
|
"output_names": [str(v) for v in output_names_raw],
|
|
}
|
|
meta = {
|
|
"source": "generated",
|
|
"pack": str(entry.get("pack") or ""),
|
|
"display": str(entry.get("display") or entry.get("type") or node_type),
|
|
"repository": str(entry.get("repository") or ""),
|
|
"confidence": str(entry.get("confidence") or ""),
|
|
}
|
|
return sig, meta
|
|
|
|
|
|
def load_generated_signatures(base_dir):
|
|
path = os.path.join(base_dir, _GENERATED_SIGNATURES_FILE)
|
|
generated = _empty_generated_signatures()
|
|
if not os.path.isfile(path):
|
|
return generated
|
|
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
raw = json.load(f)
|
|
except Exception as e:
|
|
print(f"[UTFCN] failed to read {_GENERATED_SIGNATURES_FILE}: {e}")
|
|
return generated
|
|
|
|
if not isinstance(raw, dict) or raw.get("schema_version") != _GENERATED_SCHEMA_VERSION:
|
|
print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: unsupported schema")
|
|
return generated
|
|
|
|
nodes = raw.get("nodes") or {}
|
|
if not isinstance(nodes, dict):
|
|
print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: nodes must be an object")
|
|
return generated
|
|
|
|
for node_type, entry in nodes.items():
|
|
normalised = _normalise_generated_signature(str(node_type), entry)
|
|
if normalised is None:
|
|
continue
|
|
sig, meta = normalised
|
|
generated["sigs"][str(node_type)] = sig
|
|
generated["meta"][str(node_type)] = meta
|
|
generated["by_out"][_first_output_type(sig)].append(str(node_type))
|
|
|
|
return generated
|
|
|
|
|
|
def _normalise_rules(raw):
|
|
"""Accept both {source: {...single...}} and {source: [ {...}, {...} ]} shapes."""
|
|
out = {}
|
|
for src, val in (raw.get("rules") or {}).items():
|
|
targets = val if isinstance(val, list) else [val]
|
|
out[src] = [t for t in targets if isinstance(t, dict) and t.get("to")]
|
|
return out
|
|
|
|
|
|
def load_rules(base_dir):
|
|
"""Load builtin mappings.json, then deep-merge user_mappings.json on top (user wins per source)."""
|
|
merged = {}
|
|
for fname in ("mappings.json", "user_mappings.json"):
|
|
path = os.path.join(base_dir, fname)
|
|
if not os.path.isfile(path):
|
|
continue
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
merged.update(_normalise_rules(json.load(f)))
|
|
except Exception as e: # a broken user file must never take the server down
|
|
print(f"[UTFCN] failed to read {fname}: {e}")
|
|
return merged
|
|
|
|
|
|
def build_context(rules, generated=None):
|
|
"""
|
|
Snapshot the live node registry once (signatures + source of every node).
|
|
|
|
Returned context is reused by build_index() (the /utfcn/scan payload) and by
|
|
match() (per-workflow matching of UNINSTALLED nodes), so the expensive walk
|
|
only happens on refresh.
|
|
|
|
`rules` is the merged curated mapping: {sourceType: [ {to, note, inputs, widgets, outputs}, ... ]}.
|
|
"""
|
|
import nodes # imported here so the module stays importable outside ComfyUI
|
|
|
|
classes = nodes.NODE_CLASS_MAPPINGS
|
|
displays = getattr(nodes, "NODE_DISPLAY_NAME_MAPPINGS", {})
|
|
|
|
sources, sigs = {}, {}
|
|
for name, cls in classes.items():
|
|
module = _module_of(cls)
|
|
sources[name] = {"source": _source_kind(module), "pack": _pack_of(module), "display": displays.get(name, name)}
|
|
sigs[name] = _signature(cls)
|
|
|
|
# Bucket every potential *target* by its first output type so a source only
|
|
# gets compared against nodes that could plausibly feed the same downstream.
|
|
by_out = defaultdict(list)
|
|
for name in classes:
|
|
by_out[_first_output_type(sigs[name])].append(name)
|
|
|
|
return {
|
|
"sources": sources,
|
|
"sigs": sigs,
|
|
"by_out": by_out,
|
|
"rules": rules,
|
|
"generated": generated or _empty_generated_signatures(),
|
|
}
|
|
|
|
|
|
def _candidates_for(src_name, src_sig, src_pack, ctx, src_meta=None):
|
|
"""
|
|
Rank replacement candidates for one source node.
|
|
|
|
`src_sig` may be None (an uninstalled node we know only by name) — then only
|
|
curated rules apply. If a signature is given (installed node, or a missing
|
|
node's serialized signature), exact/partial tiers are added too.
|
|
`src_pack` is None for uninstalled/unknown sources (skips same-pack exclusion).
|
|
"""
|
|
sources, sigs, by_out, rules = ctx["sources"], ctx["sigs"], ctx["by_out"], ctx["rules"]
|
|
if not isinstance(src_meta, dict):
|
|
src_meta = sources.get(src_name, {})
|
|
found, seen = [], set()
|
|
|
|
# --- tier 1: curated rules (ordered preference; core-first is the author's job) ---
|
|
for rule in rules.get(src_name, []):
|
|
to = rule.get("to")
|
|
if not to or to == src_name or to not in sources or to in seen:
|
|
continue
|
|
seen.add(to)
|
|
found.append(_candidate(to, sources, "curated", 1.0, rule))
|
|
|
|
# --- tiers 2 & 3: signature matching within the same output bucket ---
|
|
if src_sig is not None:
|
|
ranked = []
|
|
for cand_name in by_out.get(_first_output_type(src_sig), []):
|
|
if cand_name in seen or cand_name == src_name:
|
|
continue
|
|
cand_meta = sources[cand_name]
|
|
# target must be core, or a DIFFERENT installed pack (fallback-to-available)
|
|
if cand_meta["source"] == "custom" and src_pack is not None and cand_meta["pack"] == src_pack:
|
|
continue
|
|
cand_sig = sigs[cand_name]
|
|
if not _feasible(src_sig, cand_sig):
|
|
continue
|
|
if not _features_compatible(src_name, src_sig, src_meta, cand_name, cand_sig, cand_meta):
|
|
continue
|
|
if _is_exact(src_sig, cand_sig):
|
|
ranked.append((cand_name, "exact", 1.0))
|
|
else:
|
|
sc = _score(src_sig, cand_sig)
|
|
if sc >= _PARTIAL_THRESHOLD:
|
|
ranked.append((cand_name, "partial", sc))
|
|
|
|
ranked.sort(key=lambda r: (
|
|
0 if sources[r[0]]["source"] == "core" else 1, # core before pack
|
|
0 if r[1] == "exact" else 1, # exact before partial
|
|
-r[2], # higher score first
|
|
))
|
|
for cand_name, tier, sc in ranked:
|
|
if cand_name in seen:
|
|
continue
|
|
seen.add(cand_name)
|
|
found.append(_candidate(cand_name, sources, tier, sc, None))
|
|
|
|
return found[:_MAX_CANDIDATES]
|
|
|
|
|
|
def build_index(ctx):
|
|
"""
|
|
Build the /utfcn/scan payload from a context.
|
|
|
|
Covers INSTALLED custom nodes (curated + signature tiers) AND uninstalled
|
|
source types that a curated rule targets an installed node for — so a rule
|
|
still fires on a node whose pack you never installed.
|
|
|
|
Returns { "sources": {...}, "candidates": {srcType: [candidate,...]}, "stats": {...} }.
|
|
"""
|
|
sources = ctx["sources"]
|
|
candidates = {}
|
|
|
|
for src_name, meta in sources.items():
|
|
if meta["source"] != "custom":
|
|
continue
|
|
found = _candidates_for(src_name, ctx["sigs"][src_name], meta["pack"], ctx)
|
|
if found:
|
|
candidates[src_name] = found
|
|
|
|
# curated rules whose SOURCE isn't installed (the "replace a missing node
|
|
# without installing its pack" case) — no signature, so curated-only.
|
|
uninstalled = 0
|
|
for src_name in ctx["rules"]:
|
|
if src_name in sources or src_name in candidates:
|
|
continue
|
|
found = _candidates_for(src_name, None, None, ctx)
|
|
if found:
|
|
candidates[src_name] = found
|
|
uninstalled += 1
|
|
|
|
stats = {
|
|
"nodes": len(sources),
|
|
"custom": sum(1 for m in sources.values() if m["source"] == "custom"),
|
|
"replaceable": len(candidates),
|
|
"verified": sum(1 for cl in candidates.values() if any(c["verified"] for c in cl)),
|
|
"uninstalled": uninstalled,
|
|
}
|
|
return {"sources": sources, "candidates": candidates, "stats": stats}
|
|
|
|
|
|
def _signature_from_item(it):
|
|
inputs_raw = it.get("inputs") or {}
|
|
if not isinstance(inputs_raw, dict):
|
|
inputs_raw = {}
|
|
outputs_raw = it.get("outputs") or []
|
|
if not isinstance(outputs_raw, list):
|
|
outputs_raw = []
|
|
output_names_raw = it.get("output_names") or []
|
|
if not isinstance(output_names_raw, list):
|
|
output_names_raw = []
|
|
|
|
inputs = {str(k): str(v) for k, v in inputs_raw.items() if k is not None}
|
|
return {
|
|
"inputs": inputs,
|
|
"required": set(inputs),
|
|
"outputs": [str(x) for x in outputs_raw],
|
|
"output_names": [str(x) for x in output_names_raw],
|
|
}
|
|
|
|
|
|
def _generated_signature_usable(sig):
|
|
return isinstance(sig, dict) and isinstance(sig.get("inputs"), dict) and isinstance(sig.get("outputs"), list)
|
|
|
|
|
|
def _normalised_generated_signature(sig):
|
|
if not _generated_signature_usable(sig):
|
|
return None
|
|
|
|
try:
|
|
inputs = {str(k): str(v) for k, v in sig["inputs"].items() if k is not None}
|
|
outputs = [str(x) for x in sig["outputs"]]
|
|
required_raw = sig.get("required") or []
|
|
if not isinstance(required_raw, (list, set, tuple)):
|
|
required_raw = []
|
|
output_names_raw = sig.get("output_names") or []
|
|
if not isinstance(output_names_raw, list):
|
|
output_names_raw = []
|
|
return {
|
|
"inputs": inputs,
|
|
"required": {str(v) for v in required_raw if str(v) in inputs},
|
|
"outputs": outputs,
|
|
"output_names": [str(x) for x in output_names_raw],
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _generated_signature_conflicts(serialized_sig, generated_sig):
|
|
if not serialized_sig["inputs"] and not serialized_sig["outputs"]:
|
|
return False
|
|
|
|
generated_inputs = generated_sig["inputs"]
|
|
for name, typ in serialized_sig["inputs"].items():
|
|
if name in generated_inputs:
|
|
if generated_inputs[name] != typ:
|
|
return True
|
|
else:
|
|
return True
|
|
|
|
if Counter(serialized_sig["outputs"]) - Counter(generated_sig["outputs"]):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def match(ctx, items):
|
|
"""
|
|
Match a batch of nodes given only their (possibly serialized) signature —
|
|
used for UNINSTALLED / missing nodes in an open workflow.
|
|
|
|
`items`: [ {"type": str, "inputs": {name: TYPE}, "outputs": [TYPE], "output_names": [..]} ].
|
|
Serialized nodes only carry link slots (not widget values), so 'exact' rarely
|
|
fires; curated rules (by type name), bundled generated signatures, and
|
|
feature-gated partial link-type matches do.
|
|
|
|
Returns a mapping from source node type to candidate list.
|
|
"""
|
|
out = {}
|
|
generated = ctx.get("generated") or {}
|
|
if not isinstance(generated, dict):
|
|
generated = {}
|
|
generated_sigs = generated.get("sigs") or {}
|
|
if not isinstance(generated_sigs, dict):
|
|
generated_sigs = {}
|
|
generated_meta = generated.get("meta") or {}
|
|
if not isinstance(generated_meta, dict):
|
|
generated_meta = {}
|
|
|
|
for it in items:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
t = it.get("type")
|
|
if not t or t in out:
|
|
continue
|
|
|
|
sig = _signature_from_item(it)
|
|
gen_sig = _normalised_generated_signature(generated_sigs.get(t))
|
|
if gen_sig is not None and not _generated_signature_conflicts(sig, gen_sig):
|
|
gen_meta = generated_meta.get(t) or {}
|
|
if not isinstance(gen_meta, dict):
|
|
gen_meta = {}
|
|
gen_pack = gen_meta.get("pack")
|
|
found = _candidates_for(t, gen_sig, gen_pack, ctx, gen_meta)
|
|
if found:
|
|
out[t] = found
|
|
continue
|
|
|
|
item_meta = {"display": it.get("display") or t}
|
|
found = _candidates_for(t, sig, None, ctx, item_meta)
|
|
if found:
|
|
out[t] = found
|
|
return out
|
|
|
|
|
|
def _candidate(to, sources, tier, score, rule):
|
|
meta = sources[to]
|
|
cand = {
|
|
"to": to,
|
|
"to_display": meta["display"],
|
|
"source": meta["source"], # "core" | "custom"
|
|
"pack": meta["pack"],
|
|
"tier": tier, # "curated" | "exact" | "partial"
|
|
"verified": tier in ("curated", "exact"),
|
|
"score": round(float(score), 3),
|
|
}
|
|
if rule:
|
|
# explicit name remaps travel to the frontend so the swap is exact
|
|
for key in ("inputs", "widgets", "outputs"):
|
|
if isinstance(rule.get(key), dict):
|
|
cand[key] = rule[key]
|
|
if rule.get("note"):
|
|
cand["note"] = rule["note"]
|
|
return cand
|