From 8c45016c00d1bfbe1d7bc6bdeba188099f6adceb Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Thu, 2 Jul 2026 11:29:54 +0200 Subject: [PATCH] Add generated signature loader --- tests/__init__.py | 1 + tests/test_utfcn_core_generated.py | 119 +++++++++++++++++++++++++++++ utfcn_core.py | 82 ++++++++++++++++++++ 3 files changed, 202 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/test_utfcn_core_generated.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/test_utfcn_core_generated.py b/tests/test_utfcn_core_generated.py new file mode 100644 index 0000000..fb1788c --- /dev/null +++ b/tests/test_utfcn_core_generated.py @@ -0,0 +1,119 @@ +import json +import tempfile +import unittest +from pathlib import Path + +import utfcn_core + + +class GeneratedSignatureLoaderTests(unittest.TestCase): + def test_missing_generated_file_returns_empty_indexes(self): + with tempfile.TemporaryDirectory() as tmp: + generated = utfcn_core.load_generated_signatures(tmp) + + self.assertEqual({}, generated["sigs"]) + self.assertEqual({}, generated["meta"]) + self.assertEqual({}, dict(generated["by_out"])) + + def test_loads_usable_static_signature(self): + payload = { + "schema_version": 1, + "generated_at": "2026-07-02T00:00:00Z", + "sources": {"limit": 1}, + "packs": { + "sample-pack": { + "title": "Sample Pack", + "repository": "https://github.com/example/sample-pack", + } + }, + "nodes": { + "SampleImageSize": { + "type": "SampleImageSize", + "display": "Sample Image Size", + "pack": "sample-pack", + "repository": "https://github.com/example/sample-pack", + "inputs": {"image": "IMAGE"}, + "required": ["image"], + "outputs": ["INT", "INT"], + "output_names": ["width", "height"], + "confidence": "static_exact", + } + }, + } + with tempfile.TemporaryDirectory() as tmp: + Path(tmp, "popular_node_signatures.json").write_text( + json.dumps(payload), + encoding="utf-8", + ) + generated = utfcn_core.load_generated_signatures(tmp) + + self.assertEqual({"image": "IMAGE"}, generated["sigs"]["SampleImageSize"]["inputs"]) + self.assertEqual({"image"}, generated["sigs"]["SampleImageSize"]["required"]) + self.assertEqual(["INT", "INT"], generated["sigs"]["SampleImageSize"]["outputs"]) + self.assertEqual(["width", "height"], generated["sigs"]["SampleImageSize"]["output_names"]) + self.assertEqual("sample-pack", generated["meta"]["SampleImageSize"]["pack"]) + self.assertEqual("Sample Image Size", generated["meta"]["SampleImageSize"]["display"]) + self.assertEqual(["SampleImageSize"], generated["by_out"]["INT"]) + + def test_rejects_metadata_only_entries_for_matching(self): + payload = { + "schema_version": 1, + "generated_at": "2026-07-02T00:00:00Z", + "sources": {}, + "packs": {}, + "nodes": { + "NameOnlyNode": { + "type": "NameOnlyNode", + "display": "Name Only", + "pack": "name-only", + "repository": "https://github.com/example/name-only", + "inputs": {}, + "required": [], + "outputs": [], + "output_names": [], + "confidence": "metadata_only", + } + }, + } + with tempfile.TemporaryDirectory() as tmp: + Path(tmp, "popular_node_signatures.json").write_text( + json.dumps(payload), + encoding="utf-8", + ) + generated = utfcn_core.load_generated_signatures(tmp) + + self.assertNotIn("NameOnlyNode", generated["sigs"]) + self.assertNotIn("NameOnlyNode", generated["meta"]) + self.assertEqual({}, dict(generated["by_out"])) + + def test_malformed_generated_file_returns_empty_indexes(self): + with tempfile.TemporaryDirectory() as tmp: + Path(tmp, "popular_node_signatures.json").write_text("{broken", encoding="utf-8") + generated = utfcn_core.load_generated_signatures(tmp) + + self.assertEqual({}, generated["sigs"]) + self.assertEqual({}, generated["meta"]) + self.assertEqual({}, dict(generated["by_out"])) + + def test_unsupported_schema_returns_empty_indexes(self): + payload = { + "schema_version": 99, + "generated_at": "2026-07-02T00:00:00Z", + "sources": {}, + "packs": {}, + "nodes": {}, + } + with tempfile.TemporaryDirectory() as tmp: + Path(tmp, "popular_node_signatures.json").write_text( + json.dumps(payload), + encoding="utf-8", + ) + generated = utfcn_core.load_generated_signatures(tmp) + + self.assertEqual({}, generated["sigs"]) + self.assertEqual({}, generated["meta"]) + self.assertEqual({}, dict(generated["by_out"])) + + +if __name__ == "__main__": + unittest.main() diff --git a/utfcn_core.py b/utfcn_core.py index ae07d6d..6e551a9 100644 --- a/utfcn_core.py +++ b/utfcn_core.py @@ -124,6 +124,88 @@ _PARTIAL_THRESHOLD = 0.5 # max candidates returned per source node _MAX_CANDIDATES = 6 +_GENERATED_SCHEMA_VERSION = 1 +_GENERATED_SIGNATURES_FILE = "popular_node_signatures.json" + + +def _empty_generated_signatures(): + return {"sigs": {}, "meta": {}, "by_out": defaultdict(list)} + + +def _normalise_generated_signature(node_type, entry): + if not isinstance(entry, dict): + return None + if str(entry.get("confidence") or "") == "metadata_only": + return None + + inputs_raw = entry.get("inputs") or {} + if not isinstance(inputs_raw, dict): + return None + outputs_raw = entry.get("outputs") or [] + if not isinstance(outputs_raw, list): + return None + + inputs = {str(k): str(v) for k, v in inputs_raw.items() if k is not None} + outputs = [str(v) for v in outputs_raw if v is not None] + if not inputs and not outputs: + return None + + required_raw = entry.get("required") or [] + if not isinstance(required_raw, list): + required_raw = [] + output_names_raw = entry.get("output_names") or [] + if not isinstance(output_names_raw, list): + output_names_raw = [] + + sig = { + "inputs": inputs, + "required": {str(v) for v in required_raw if str(v) in inputs}, + "outputs": outputs, + "output_names": [str(v) for v in output_names_raw], + } + meta = { + "source": "generated", + "pack": str(entry.get("pack") or ""), + "display": str(entry.get("display") or entry.get("type") or node_type), + "repository": str(entry.get("repository") or ""), + "confidence": str(entry.get("confidence") or ""), + } + return sig, meta + + +def load_generated_signatures(base_dir): + path = os.path.join(base_dir, _GENERATED_SIGNATURES_FILE) + generated = _empty_generated_signatures() + if not os.path.isfile(path): + return generated + + try: + with open(path, "r", encoding="utf-8") as f: + raw = json.load(f) + except Exception as e: + print(f"[UTFCN] failed to read {_GENERATED_SIGNATURES_FILE}: {e}") + return generated + + if not isinstance(raw, dict) or raw.get("schema_version") != _GENERATED_SCHEMA_VERSION: + print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: unsupported schema") + return generated + + nodes = raw.get("nodes") or {} + if not isinstance(nodes, dict): + print(f"[UTFCN] ignored {_GENERATED_SIGNATURES_FILE}: nodes must be an object") + return generated + + for node_type, entry in nodes.items(): + normalised = _normalise_generated_signature(str(node_type), entry) + if normalised is None: + continue + sig, meta = normalised + generated["sigs"][str(node_type)] = sig + generated["meta"][str(node_type)] = meta + generated["by_out"][_first_output_type(sig)].append(str(node_type)) + + return generated + def _normalise_rules(raw): """Accept both {source: {...single...}} and {source: [ {...}, {...} ]} shapes."""