Add generated signature loader

This commit is contained in:
2026-07-02 11:29:54 +02:00
parent 55e59f5e1d
commit 8c45016c00
3 changed files with 202 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
+119
View File
@@ -0,0 +1,119 @@
import json
import tempfile
import unittest
from pathlib import Path
import utfcn_core
class GeneratedSignatureLoaderTests(unittest.TestCase):
def test_missing_generated_file_returns_empty_indexes(self):
with tempfile.TemporaryDirectory() as tmp:
generated = utfcn_core.load_generated_signatures(tmp)
self.assertEqual({}, generated["sigs"])
self.assertEqual({}, generated["meta"])
self.assertEqual({}, dict(generated["by_out"]))
def test_loads_usable_static_signature(self):
payload = {
"schema_version": 1,
"generated_at": "2026-07-02T00:00:00Z",
"sources": {"limit": 1},
"packs": {
"sample-pack": {
"title": "Sample Pack",
"repository": "https://github.com/example/sample-pack",
}
},
"nodes": {
"SampleImageSize": {
"type": "SampleImageSize",
"display": "Sample Image Size",
"pack": "sample-pack",
"repository": "https://github.com/example/sample-pack",
"inputs": {"image": "IMAGE"},
"required": ["image"],
"outputs": ["INT", "INT"],
"output_names": ["width", "height"],
"confidence": "static_exact",
}
},
}
with tempfile.TemporaryDirectory() as tmp:
Path(tmp, "popular_node_signatures.json").write_text(
json.dumps(payload),
encoding="utf-8",
)
generated = utfcn_core.load_generated_signatures(tmp)
self.assertEqual({"image": "IMAGE"}, generated["sigs"]["SampleImageSize"]["inputs"])
self.assertEqual({"image"}, generated["sigs"]["SampleImageSize"]["required"])
self.assertEqual(["INT", "INT"], generated["sigs"]["SampleImageSize"]["outputs"])
self.assertEqual(["width", "height"], generated["sigs"]["SampleImageSize"]["output_names"])
self.assertEqual("sample-pack", generated["meta"]["SampleImageSize"]["pack"])
self.assertEqual("Sample Image Size", generated["meta"]["SampleImageSize"]["display"])
self.assertEqual(["SampleImageSize"], generated["by_out"]["INT"])
def test_rejects_metadata_only_entries_for_matching(self):
payload = {
"schema_version": 1,
"generated_at": "2026-07-02T00:00:00Z",
"sources": {},
"packs": {},
"nodes": {
"NameOnlyNode": {
"type": "NameOnlyNode",
"display": "Name Only",
"pack": "name-only",
"repository": "https://github.com/example/name-only",
"inputs": {},
"required": [],
"outputs": [],
"output_names": [],
"confidence": "metadata_only",
}
},
}
with tempfile.TemporaryDirectory() as tmp:
Path(tmp, "popular_node_signatures.json").write_text(
json.dumps(payload),
encoding="utf-8",
)
generated = utfcn_core.load_generated_signatures(tmp)
self.assertNotIn("NameOnlyNode", generated["sigs"])
self.assertNotIn("NameOnlyNode", generated["meta"])
self.assertEqual({}, dict(generated["by_out"]))
def test_malformed_generated_file_returns_empty_indexes(self):
with tempfile.TemporaryDirectory() as tmp:
Path(tmp, "popular_node_signatures.json").write_text("{broken", encoding="utf-8")
generated = utfcn_core.load_generated_signatures(tmp)
self.assertEqual({}, generated["sigs"])
self.assertEqual({}, generated["meta"])
self.assertEqual({}, dict(generated["by_out"]))
def test_unsupported_schema_returns_empty_indexes(self):
payload = {
"schema_version": 99,
"generated_at": "2026-07-02T00:00:00Z",
"sources": {},
"packs": {},
"nodes": {},
}
with tempfile.TemporaryDirectory() as tmp:
Path(tmp, "popular_node_signatures.json").write_text(
json.dumps(payload),
encoding="utf-8",
)
generated = utfcn_core.load_generated_signatures(tmp)
self.assertEqual({}, generated["sigs"])
self.assertEqual({}, generated["meta"])
self.assertEqual({}, dict(generated["by_out"]))
if __name__ == "__main__":
unittest.main()
+82
View File
@@ -124,6 +124,88 @@ _PARTIAL_THRESHOLD = 0.5
# max candidates returned per source node
_MAX_CANDIDATES = 6
_GENERATED_SCHEMA_VERSION = 1
_GENERATED_SIGNATURES_FILE = "popular_node_signatures.json"
def _empty_generated_signatures():
return {"sigs": {}, "meta": {}, "by_out": defaultdict(list)}
def _normalise_generated_signature(node_type, entry):
if not isinstance(entry, dict):
return None
if str(entry.get("confidence") or "") == "metadata_only":
return None
inputs_raw = entry.get("inputs") or {}
if not isinstance(inputs_raw, dict):
return None
outputs_raw = entry.get("outputs") or []
if not isinstance(outputs_raw, list):
return None
inputs = {str(k): str(v) for k, v in inputs_raw.items() if k is not None}
outputs = [str(v) for v in outputs_raw if v is not None]
if not inputs and not outputs:
return None
required_raw = entry.get("required") or []
if not isinstance(required_raw, list):
required_raw = []
output_names_raw = entry.get("output_names") or []
if not isinstance(output_names_raw, list):
output_names_raw = []
sig = {
"inputs": inputs,
"required": {str(v) for v in required_raw if str(v) in inputs},
"outputs": outputs,
"output_names": [str(v) for v in output_names_raw],
}
meta = {
"source": "generated",
"pack": str(entry.get("pack") or ""),
"display": str(entry.get("display") or entry.get("type") or node_type),
"repository": str(entry.get("repository") or ""),
"confidence": str(entry.get("confidence") or ""),
}
return sig, meta
def load_generated_signatures(base_dir):
path = os.path.join(base_dir, _GENERATED_SIGNATURES_FILE)
generated = _empty_generated_signatures()
if not os.path.isfile(path):
return generated
try:
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
except Exception as e:
print(f"[UTFCN] failed to read {_GENERATED_SIGNATURES_FILE}: {e}")
return generated
if not isinstance(raw, dict) or raw.get("schema_version") != _GENERATED_SCHEMA_VERSION:
print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: unsupported schema")
return generated
nodes = raw.get("nodes") or {}
if not isinstance(nodes, dict):
print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: nodes must be an object")
return generated
for node_type, entry in nodes.items():
normalised = _normalise_generated_signature(str(node_type), entry)
if normalised is None:
continue
sig, meta = normalised
generated["sigs"][str(node_type)] = sig
generated["meta"][str(node_type)] = meta
generated["by_out"][_first_output_type(sig)].append(str(node_type))
return generated
def _normalise_rules(raw):
"""Accept both {source: {...single...}} and {source: [ {...}, {...} ]} shapes."""