Tracks every node used in every prompt submission via SQLite, maps class_types to source packages, and exposes API endpoints and a frontend dialog for viewing per-package usage stats. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
153 lines
5.1 KiB
Python
153 lines
5.1 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DB_PATH = os.path.join(os.path.dirname(__file__), "usage_stats.db")
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS node_usage (
|
|
class_type TEXT PRIMARY KEY,
|
|
package TEXT NOT NULL,
|
|
count INTEGER NOT NULL DEFAULT 0,
|
|
first_seen TEXT NOT NULL,
|
|
last_seen TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS prompt_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
class_types TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_node_usage_package ON node_usage(package);
|
|
CREATE INDEX IF NOT EXISTS idx_prompt_log_timestamp ON prompt_log(timestamp);
|
|
"""
|
|
|
|
|
|
class UsageTracker:
|
|
def __init__(self, db_path=DB_PATH):
|
|
self._db_path = db_path
|
|
self._lock = threading.Lock()
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
with self._lock:
|
|
conn = sqlite3.connect(self._db_path)
|
|
try:
|
|
conn.executescript(SCHEMA)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _connect(self):
|
|
return sqlite3.connect(self._db_path)
|
|
|
|
def record_usage(self, class_types, mapper):
|
|
"""Record usage of a set of class_types from a single prompt execution."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with self._lock:
|
|
conn = self._connect()
|
|
try:
|
|
for ct in class_types:
|
|
package = mapper.get_package(ct)
|
|
conn.execute(
|
|
"""INSERT INTO node_usage (class_type, package, count, first_seen, last_seen)
|
|
VALUES (?, ?, 1, ?, ?)
|
|
ON CONFLICT(class_type) DO UPDATE SET
|
|
count = count + 1,
|
|
last_seen = excluded.last_seen""",
|
|
(ct, package, now, now),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO prompt_log (timestamp, class_types) VALUES (?, ?)",
|
|
(now, json.dumps(list(class_types))),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_node_stats(self):
|
|
"""Return raw per-node usage data."""
|
|
with self._lock:
|
|
conn = self._connect()
|
|
try:
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"SELECT class_type, package, count, first_seen, last_seen FROM node_usage ORDER BY count DESC"
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_package_stats(self, mapper):
|
|
"""Aggregate per-package stats combining DB data with known nodes."""
|
|
node_stats = self.get_node_stats()
|
|
|
|
# Build per-package data from DB
|
|
packages = {}
|
|
for row in node_stats:
|
|
pkg = row["package"]
|
|
if pkg not in packages:
|
|
packages[pkg] = {
|
|
"package": pkg,
|
|
"total_executions": 0,
|
|
"used_nodes": 0,
|
|
"nodes": [],
|
|
"last_seen": None,
|
|
}
|
|
entry = packages[pkg]
|
|
entry["total_executions"] += row["count"]
|
|
entry["used_nodes"] += 1
|
|
entry["nodes"].append(row)
|
|
if entry["last_seen"] is None or row["last_seen"] > entry["last_seen"]:
|
|
entry["last_seen"] = row["last_seen"]
|
|
|
|
# Count total registered nodes per package from mapper
|
|
node_counts = {}
|
|
for ct, pkg in mapper.mapping.items():
|
|
node_counts.setdefault(pkg, 0)
|
|
node_counts[pkg] += 1
|
|
|
|
# Also include zero-node packages from LOADED_MODULE_DIRS
|
|
for pkg in mapper.get_all_packages():
|
|
if pkg not in node_counts:
|
|
node_counts[pkg] = 0
|
|
|
|
# Merge: ensure every known package appears
|
|
for pkg, total in node_counts.items():
|
|
if pkg not in packages:
|
|
packages[pkg] = {
|
|
"package": pkg,
|
|
"total_executions": 0,
|
|
"used_nodes": 0,
|
|
"nodes": [],
|
|
"last_seen": None,
|
|
}
|
|
packages[pkg]["total_nodes"] = total
|
|
packages[pkg]["never_used"] = packages[pkg]["total_executions"] == 0
|
|
|
|
# For packages that came only from DB (e.g. uninstalled), fill total_nodes
|
|
for pkg, entry in packages.items():
|
|
if "total_nodes" not in entry:
|
|
entry["total_nodes"] = entry["used_nodes"]
|
|
entry["never_used"] = False
|
|
|
|
result = sorted(packages.values(), key=lambda p: p["total_executions"])
|
|
return result
|
|
|
|
def reset(self):
|
|
"""Clear all tracked data."""
|
|
with self._lock:
|
|
conn = self._connect()
|
|
try:
|
|
conn.execute("DELETE FROM node_usage")
|
|
conn.execute("DELETE FROM prompt_log")
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|