Add popular node metadata build CLI

This commit is contained in:
2026-07-02 21:57:31 +02:00
parent dddb136b16
commit 1895a0e677
2 changed files with 683 additions and 0 deletions
+413
View File
@@ -2,15 +2,24 @@
"""Generate UTFCN's popular_node_signatures.json artifact."""
import ast
import argparse
import hashlib
import json
import os
import re
import subprocess
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
SCHEMA_VERSION = 1
MANAGER_LIST_URL = "https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main/custom-node-list.json"
REGISTRY_NODES_URL = "https://api.comfy.org/nodes"
DEFAULT_GENERATED_AT = "1970-01-01T00:00:00Z"
DEFAULT_CACHE_DIR = Path(".cache/utfcn-popular-node-repos")
DEFAULT_OUTPUT = Path("popular_node_signatures.json")
USER_AGENT = "ComfyUI-UTFCN popular node signature generator"
class UnsupportedStaticExpression(Exception):
@@ -41,6 +50,293 @@ _CLASS_SIGNATURE_ATTRS = {"INPUT_TYPES", "RETURN_NAMES", "RETURN_TYPES"}
_DYNAMIC_NAMESPACE_MUTATION = object()
_NAMESPACE_FUNCTIONS = {"globals", "locals", "vars"}
_NAMESPACE_DUNDER_MUTATORS = {"__delitem__", "__setitem__"}
_METRIC_FIELDS = (
"downloads",
"download_count",
"stars",
"github_stars",
"stargazers_count",
"favorites",
"favourites",
"installed",
"installs",
"install_count",
"count",
)
def fetch_json(url):
request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(request, timeout=30) as response:
return json.loads(response.read().decode("utf-8"))
except Exception as exc:
raise RuntimeError(f"failed to fetch JSON from {url}: {exc}") from exc
def _manager_entries(raw):
if isinstance(raw, list):
return raw
if not isinstance(raw, dict):
return []
for key in ("custom_nodes", "customNodes", "nodes", "items"):
value = raw.get(key)
if isinstance(value, list):
return value
return []
def _coerce_int(value):
if isinstance(value, bool):
return 0
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
text = value.strip().replace(",", "")
if text.isdigit() or (text.startswith("-") and text[1:].isdigit()):
return int(text)
return 0
def _slug(value, default="unnamed-pack"):
text = str(value or "").strip().lower()
text = re.sub(r"[^a-z0-9]+", "-", text).strip("-")
return text or default
def github_repo_url(value):
if not isinstance(value, str):
return None
text = value.strip()
if not text:
return None
parsed = urlparse(text)
if parsed.scheme not in {"http", "https"} or parsed.netloc.lower() != "github.com":
return None
parts = [part for part in parsed.path.split("/") if part]
if len(parts) < 2:
return None
owner, repo = parts[0], parts[1]
return f"https://github.com/{owner}/{repo}"
def _normalise_repository_url(value):
if not isinstance(value, str):
return None
text = value.strip()
if not text:
return None
if re.match(r"^[A-Za-z0-9_.-]+@[A-Za-z0-9_.-]+:.+/.+(\.git)?$", text):
return text
parsed = urlparse(text)
if parsed.netloc.lower() == "github.com":
return github_repo_url(text)
if parsed.netloc.lower() == "raw.githubusercontent.com":
return None
if parsed.scheme not in {"http", "https", "git", "ssh"}:
return None
host = parsed.netloc.lower()
if not host:
return None
path_parts = [part for part in parsed.path.split("/") if part]
if len(path_parts) < 2:
return None
last = path_parts[-1].lower()
if not last.endswith(".git") and "." in last:
return None
return text
def _is_cloneable_repo_url(value):
return _normalise_repository_url(value) is not None
def _repository_candidates(item):
for key in ("repository", "repo", "git", "git_url", "url", "reference"):
value = item.get(key)
if isinstance(value, str):
yield value
elif isinstance(value, list):
for candidate in value:
yield candidate
files = item.get("files")
if isinstance(files, str):
yield files
elif isinstance(files, list):
for candidate in files:
yield candidate
def _manager_entry_repository(item):
install_type = str(item.get("install_type") or item.get("installType") or "").lower()
candidates = list(_repository_candidates(item))
if "git" in install_type:
for candidate in candidates:
repository = _normalise_repository_url(candidate)
if repository:
return repository
return None
for candidate in candidates:
repository = _normalise_repository_url(candidate)
if repository:
return repository
return None
def _entry_metrics(item):
metrics = {}
sources = [item]
for key in ("stats", "statistics", "metadata"):
value = item.get(key)
if isinstance(value, dict):
sources.append(value)
for source in sources:
for field in _METRIC_FIELDS:
value = _coerce_int(source.get(field))
if value:
metrics[field] = value
return metrics
def _pack_id_from_repository(repository):
parsed = urlparse(repository)
if parsed.netloc:
parts = [part for part in parsed.path.split("/") if part]
if parts:
return _slug(parts[-1].removesuffix(".git"))
return _slug(parsed.netloc)
if ":" in repository:
return _slug(repository.rsplit("/", 1)[-1].removesuffix(".git"))
return _slug(repository)
def normalise_manager_entries(raw):
entries = []
for manager_order, item in enumerate(_manager_entries(raw)):
if not isinstance(item, dict):
continue
repository = _manager_entry_repository(item)
if repository is None:
continue
pack_id = str(item.get("id") or "").strip()
if not pack_id:
pack_id = _slug(item.get("title") or _pack_id_from_repository(repository))
title = str(item.get("title") or pack_id).strip() or pack_id
entry = {
"id": pack_id,
"title": title,
"author": str(item.get("author") or "").strip(),
"repository": repository,
"manager_order": manager_order,
"metrics": _entry_metrics(item),
}
description = str(item.get("description") or "").strip()
if description:
entry["description"] = description
entries.append(entry)
return entries
def _popularity_score(pack):
return sum(_coerce_int(value) for value in pack.get("metrics", {}).values())
def rank_packs(packs, limit=None):
best_by_repository = {}
for pack in packs:
repository = pack.get("repository")
if not repository:
continue
candidate = dict(pack)
previous = best_by_repository.get(repository)
if previous is None:
best_by_repository[repository] = candidate
continue
candidate_key = (
_popularity_score(candidate),
-int(candidate.get("manager_order", 0)),
str(candidate.get("id", "")),
)
previous_key = (
_popularity_score(previous),
-int(previous.get("manager_order", 0)),
str(previous.get("id", "")),
)
if candidate_key > previous_key:
best_by_repository[repository] = candidate
ranked = sorted(
best_by_repository.values(),
key=lambda pack: (
-_popularity_score(pack),
str(pack.get("title", "")).lower(),
str(pack.get("id", "")),
str(pack.get("repository", "")),
),
)
if limit is not None:
ranked = ranked[:limit]
result = []
for index, pack in enumerate(ranked, start=1):
ranked_pack = dict(pack)
ranked_pack["rank"] = index
result.append(ranked_pack)
return result
def rank_entries(entries, limit=None):
return rank_packs(entries, limit)
def _repo_cache_slug(url):
text = str(url).strip()
parsed = urlparse(text)
if parsed.netloc:
parts = [parsed.netloc, *[part for part in parsed.path.split("/") if part]]
elif ":" in text:
host, path = text.split(":", 1)
host = host.split("@")[-1]
parts = [host, *[part for part in path.split("/") if part]]
else:
parts = [text]
if parts and parts[-1].endswith(".git"):
parts[-1] = parts[-1][:-4]
slug = "-".join(parts).lower()
slug = re.sub(r"[^a-z0-9]+", "-", slug).strip("-")
return slug[:80].strip("-") or "repo"
def repo_cache_path(url, cache_dir):
digest = hashlib.sha256(str(url).encode("utf-8")).hexdigest()[:12]
return Path(cache_dir) / "repos" / f"{_repo_cache_slug(url)}-{digest}"
def _run_git(command):
try:
subprocess.run(
command,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
text=True,
)
except subprocess.CalledProcessError as exc:
stderr = (exc.stderr or "").strip()
detail = f": {stderr}" if stderr else ""
raise RuntimeError(f"git command failed ({' '.join(command)}){detail}") from exc
def clone_or_update_repo(url, cache_dir, *, refresh=False):
target = repo_cache_path(url, cache_dir)
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists():
if refresh:
_run_git(["git", "-C", str(target), "pull", "--ff-only"])
return target
_run_git(["git", "clone", "--depth", "1", url, str(target)])
return target
def _literal(node, env, allow_mutable_env=True):
@@ -2630,3 +2926,120 @@ def write_artifact(path, sources, packs, nodes, *, generated_at=DEFAULT_GENERATE
}
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=False) + "\n", encoding="utf-8")
def _pack_record_from_meta(pack, status, *, node_count=0, error=None):
record = {
"id": pack["id"],
"title": pack.get("title", pack["id"]),
"repository": pack.get("repository", ""),
"rank": pack.get("rank", 0),
"status": status,
"node_count": node_count,
}
if pack.get("author"):
record["author"] = pack["author"]
if pack.get("description"):
record["description"] = pack["description"]
if pack.get("metrics"):
record["metrics"] = dict(pack["metrics"])
if error is not None:
record["error"] = str(error)
return record
def _merge_pack_metadata(extracted_pack, pack):
merged = dict(extracted_pack)
if pack.get("author"):
merged["author"] = pack["author"]
if pack.get("description"):
merged["description"] = pack["description"]
if pack.get("metrics"):
merged["metrics"] = dict(pack["metrics"])
return merged
def build_artifact(
*,
manager_url=MANAGER_LIST_URL,
cache_dir=DEFAULT_CACHE_DIR,
output=DEFAULT_OUTPUT,
limit=1000,
refresh=False,
generated_at=None,
):
manager_raw = fetch_json(manager_url)
normalised = normalise_manager_entries(manager_raw)
ranked = rank_packs(normalised, limit)
packs = {}
nodes = {}
errors = 0
for pack in ranked:
try:
repo_dir = clone_or_update_repo(pack["repository"], cache_dir, refresh=refresh)
extracted = extract_repo_signatures(repo_dir, pack)
except Exception as exc:
errors += 1
packs[pack["id"]] = _pack_record_from_meta(pack, "error", error=exc)
continue
packs[pack["id"]] = _merge_pack_metadata(extracted["pack"], pack)
for node_type, node in sorted(extracted["nodes"].items()):
nodes.setdefault(node_type, node)
generated_at = generated_at if generated_at is not None else datetime.now(timezone.utc)
write_artifact(
Path(output),
sources={
"manager_url": manager_url,
"limit": limit,
"normalised_packs": len(normalised),
"processed_packs": len(ranked),
},
packs=packs,
nodes=nodes,
generated_at=generated_at,
)
return {
"processed": len(ranked),
"pack_count": len(packs),
"node_count": len(nodes),
"errors": errors,
"output": Path(output),
}
def main(argv=None):
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--manager-url", default=MANAGER_LIST_URL)
parser.add_argument("--cache-dir", type=Path, default=DEFAULT_CACHE_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--limit", type=int, default=1000)
parser.add_argument("--refresh", action="store_true")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args(argv)
summary = build_artifact(
manager_url=args.manager_url,
cache_dir=args.cache_dir,
output=args.output,
limit=args.limit,
refresh=args.refresh,
generated_at=datetime.now(timezone.utc),
)
if not args.quiet:
print(
"wrote {output} processed={processed} packs={packs} nodes={nodes} errors={errors}".format(
output=summary["output"],
processed=summary["processed"],
packs=summary["pack_count"],
nodes=summary["node_count"],
errors=summary["errors"],
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())