Files
Comfyui-Workflow-Snapshot-M…/snapshot_storage.py
Ethanfel 048171cb81
Some checks failed
Publish to ComfyUI Registry / Publish Custom Node to Registry (push) Has been cancelled
Move snapshot data to ComfyUI user directory
Store snapshots and profiles under <user_dir>/snapshot_manager/ instead
of <extension>/data/ so user data survives extension updates and reinstalls.
Automatically migrates existing data on first load.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 13:48:40 +01:00

430 lines
15 KiB
Python

"""
Filesystem storage layer for workflow snapshots.
Stores each snapshot as an individual JSON file under:
<user_dir>/snapshot_manager/snapshots/<encoded_workflow_key>/<id>.json
Workflow keys are percent-encoded for filesystem safety.
An in-memory metadata cache avoids redundant disk reads for list/prune/delete
operations. Only get_full_record() reads a file from disk after warm-up.
"""
import json
import os
import shutil
import urllib.parse
# ─── Data directory resolution ───────────────────────────────────────
# Prefer ComfyUI's persistent user directory; fall back to extension-local
# paths when running outside ComfyUI (e.g. tests).
_OLD_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
try:
import folder_paths
_USER_SM_DIR = os.path.join(folder_paths.get_user_directory(), "snapshot_manager")
except Exception:
_USER_SM_DIR = os.path.join(os.path.dirname(__file__), "data")
_DATA_DIR = os.path.join(_USER_SM_DIR, "snapshots")
# ─── In-memory metadata cache ────────────────────────────────────────
# Maps workflow_key -> list of metadata dicts (sorted by timestamp asc).
# Metadata is everything *except* graphData.
_cache = {}
_cache_warmed = set() # workflow keys already loaded from disk
def _extract_meta(record):
"""Return a lightweight copy of *record* without graphData."""
return {k: v for k, v in record.items() if k != "graphData"}
def _ensure_cached(workflow_key):
"""Warm the cache for *workflow_key* if not already loaded. Return cached list."""
if workflow_key not in _cache_warmed:
d = _workflow_dir(workflow_key)
entries = []
if os.path.isdir(d):
for fname in os.listdir(d):
if not fname.endswith(".json"):
continue
path = os.path.join(d, fname)
try:
with open(path, "r", encoding="utf-8") as f:
entries.append(_extract_meta(json.load(f)))
except (json.JSONDecodeError, OSError):
continue
entries.sort(key=lambda r: r.get("timestamp", 0))
_cache[workflow_key] = entries
_cache_warmed.add(workflow_key)
return _cache.get(workflow_key, [])
# ─── Helpers ─────────────────────────────────────────────────────────
def _workflow_dir(workflow_key):
encoded = urllib.parse.quote(workflow_key, safe="")
return os.path.join(_DATA_DIR, encoded)
def _validate_id(snapshot_id):
if not snapshot_id or "/" in snapshot_id or "\\" in snapshot_id or ".." in snapshot_id:
raise ValueError(f"Invalid snapshot id: {snapshot_id!r}")
# ─── Public API ──────────────────────────────────────────────────────
def put(record):
"""Write one snapshot record to disk and update the cache."""
snapshot_id = record["id"]
workflow_key = record["workflowKey"]
_validate_id(snapshot_id)
d = _workflow_dir(workflow_key)
os.makedirs(d, exist_ok=True)
path = os.path.join(d, f"{snapshot_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, separators=(",", ":"))
# Update cache only if already warmed; otherwise _ensure_cached will
# pick up the new file from disk on next read.
if workflow_key in _cache_warmed:
meta = _extract_meta(record)
cached = _cache[workflow_key]
cached[:] = [e for e in cached if e.get("id") != snapshot_id]
cached.append(meta)
cached.sort(key=lambda r: r.get("timestamp", 0))
def get_all_for_workflow(workflow_key):
"""Return all snapshot metadata for a workflow (no graphData), sorted ascending by timestamp."""
return [dict(e) for e in _ensure_cached(workflow_key)]
def get_full_record(workflow_key, snapshot_id):
"""Read a single snapshot file from disk (with graphData). Returns dict or None."""
_validate_id(snapshot_id)
path = os.path.join(_workflow_dir(workflow_key), f"{snapshot_id}.json")
if not os.path.isfile(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def update_meta(workflow_key, snapshot_id, fields):
"""Merge *fields* into an existing snapshot on disk without touching graphData.
Returns True on success, False if the file does not exist.
"""
_validate_id(snapshot_id)
path = os.path.join(_workflow_dir(workflow_key), f"{snapshot_id}.json")
if not os.path.isfile(path):
return False
with open(path, "r", encoding="utf-8") as f:
record = json.load(f)
# Merge fields; None values remove the key
for k, v in fields.items():
if v is None:
record.pop(k, None)
else:
record[k] = v
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, separators=(",", ":"))
# Update cache entry
for entry in _cache.get(workflow_key, []):
if entry.get("id") == snapshot_id:
for k, v in fields.items():
if k == "graphData":
continue
if v is None:
entry.pop(k, None)
else:
entry[k] = v
break
return True
def delete(workflow_key, snapshot_id):
"""Remove one snapshot file and its cache entry. Cleans up empty workflow dir."""
_validate_id(snapshot_id)
d = _workflow_dir(workflow_key)
path = os.path.join(d, f"{snapshot_id}.json")
if os.path.isfile(path):
os.remove(path)
# Update cache
if workflow_key in _cache:
_cache[workflow_key] = [e for e in _cache[workflow_key] if e.get("id") != snapshot_id]
if not _cache[workflow_key]:
del _cache[workflow_key]
_cache_warmed.discard(workflow_key)
# Clean up empty directory
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
def delete_all_for_workflow(workflow_key):
"""Delete all unlocked snapshots for a workflow. Returns {lockedCount}."""
entries = _ensure_cached(workflow_key)
locked = []
locked_count = 0
d = _workflow_dir(workflow_key)
for rec in entries:
if rec.get("locked"):
locked_count += 1
locked.append(rec)
else:
_validate_id(rec["id"])
path = os.path.join(d, f"{rec['id']}.json")
if os.path.isfile(path):
os.remove(path)
# Update cache to locked-only
if locked:
_cache[workflow_key] = locked
else:
_cache.pop(workflow_key, None)
_cache_warmed.discard(workflow_key)
# Clean up empty directory
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
return {"lockedCount": locked_count}
def get_all_workflow_keys():
"""Scan subdirs and return [{workflowKey, count}]."""
if not os.path.isdir(_DATA_DIR):
return []
results = []
for encoded_name in os.listdir(_DATA_DIR):
subdir = os.path.join(_DATA_DIR, encoded_name)
if not os.path.isdir(subdir):
continue
workflow_key = urllib.parse.unquote(encoded_name)
entries = _ensure_cached(workflow_key)
if not entries:
continue
results.append({"workflowKey": workflow_key, "count": len(entries)})
results.sort(key=lambda r: r["workflowKey"])
return results
def prune(workflow_key, max_snapshots, source=None, protected_ids=None):
"""Delete oldest unlocked snapshots beyond limit. Returns count deleted.
source filtering:
- "node": only prune records where source == "node"
- "regular": only prune records where source is absent or not "node"
- None: prune all unlocked (existing behavior)
protected_ids: set/list of snapshot IDs that must not be pruned
(e.g. ancestors of active branch tip, fork-point snapshots).
"""
_protected = set(protected_ids) if protected_ids else set()
entries = _ensure_cached(workflow_key)
if source == "node":
candidates = [r for r in entries if not r.get("locked") and r.get("source") == "node" and r.get("id") not in _protected]
elif source == "regular":
candidates = [r for r in entries if not r.get("locked") and r.get("source") != "node" and r.get("id") not in _protected]
else:
candidates = [r for r in entries if not r.get("locked") and r.get("id") not in _protected]
if len(candidates) <= max_snapshots:
return 0
to_delete = candidates[: len(candidates) - max_snapshots]
d = _workflow_dir(workflow_key)
deleted = 0
delete_ids = set()
for rec in to_delete:
_validate_id(rec["id"])
path = os.path.join(d, f"{rec['id']}.json")
if os.path.isfile(path):
os.remove(path)
deleted += 1
delete_ids.add(rec["id"])
# Update cache
if delete_ids and workflow_key in _cache:
_cache[workflow_key] = [e for e in _cache[workflow_key] if e.get("id") not in delete_ids]
if not _cache[workflow_key]:
del _cache[workflow_key]
_cache_warmed.discard(workflow_key)
# Clean up empty directory
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
return deleted
# ─── Profile Storage ─────────────────────────────────────────────────
# Profiles are stored as individual JSON files under snapshot_manager/profiles/<id>.json
_PROFILES_DIR = os.path.join(_USER_SM_DIR, "profiles")
_profile_cache = None # list of profile dicts, or None if not loaded
def _ensure_profiles_dir():
os.makedirs(_PROFILES_DIR, exist_ok=True)
def _load_profile_cache():
global _profile_cache
if _profile_cache is not None:
return _profile_cache
_ensure_profiles_dir()
profiles = []
for fname in os.listdir(_PROFILES_DIR):
if not fname.endswith(".json"):
continue
path = os.path.join(_PROFILES_DIR, fname)
try:
with open(path, "r", encoding="utf-8") as f:
profiles.append(json.load(f))
except (json.JSONDecodeError, OSError):
continue
profiles.sort(key=lambda p: p.get("timestamp", 0))
_profile_cache = profiles
return _profile_cache
def _invalidate_profile_cache():
global _profile_cache
_profile_cache = None
def profile_put(profile):
"""Create or update a profile. profile must have 'id'."""
pid = profile["id"]
_validate_id(pid)
_ensure_profiles_dir()
path = os.path.join(_PROFILES_DIR, f"{pid}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(profile, f, separators=(",", ":"))
_invalidate_profile_cache()
def profile_get_all():
"""Return all profiles sorted by timestamp."""
return [dict(p) for p in _load_profile_cache()]
def profile_get(profile_id):
"""Return a single profile by ID, or None."""
_validate_id(profile_id)
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
if not os.path.isfile(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def profile_delete(profile_id):
"""Delete a profile by ID."""
_validate_id(profile_id)
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
if os.path.isfile(path):
os.remove(path)
_invalidate_profile_cache()
def profile_update(profile_id, fields):
"""Merge fields into an existing profile. Returns True on success."""
_validate_id(profile_id)
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
if not os.path.isfile(path):
return False
with open(path, "r", encoding="utf-8") as f:
profile = json.load(f)
for k, v in fields.items():
if v is None:
profile.pop(k, None)
else:
profile[k] = v
with open(path, "w", encoding="utf-8") as f:
json.dump(profile, f, separators=(",", ":"))
_invalidate_profile_cache()
return True
# ─── Migration from old extension-local data ─────────────────────────
def _migrate_old_data():
"""Move data from the old <extension>/data/ location to the new user directory.
Only runs when the old directory exists, has content, and the new location
differs from the old one (i.e. we're actually inside ComfyUI).
"""
old_snapshots = os.path.join(_OLD_DATA_DIR, "snapshots")
old_profiles = os.path.join(_OLD_DATA_DIR, "profiles")
# Nothing to migrate if old data dir doesn't exist or paths are the same
if os.path.normpath(_OLD_DATA_DIR) == os.path.normpath(_USER_SM_DIR):
return
if not os.path.isdir(_OLD_DATA_DIR):
return
migrated_anything = False
# Migrate snapshot workflow directories
if os.path.isdir(old_snapshots):
os.makedirs(_DATA_DIR, exist_ok=True)
for name in os.listdir(old_snapshots):
src = os.path.join(old_snapshots, name)
dst = os.path.join(_DATA_DIR, name)
if not os.path.isdir(src):
continue
if os.path.exists(dst):
# Merge: move individual files that don't already exist
for fname in os.listdir(src):
s = os.path.join(src, fname)
d = os.path.join(dst, fname)
if not os.path.exists(d):
shutil.move(s, d)
# Remove source dir if now empty
try:
os.rmdir(src)
except OSError:
pass
else:
shutil.move(src, dst)
migrated_anything = True
# Migrate profile files
if os.path.isdir(old_profiles):
os.makedirs(_PROFILES_DIR, exist_ok=True)
for fname in os.listdir(old_profiles):
if not fname.endswith(".json"):
continue
src = os.path.join(old_profiles, fname)
dst = os.path.join(_PROFILES_DIR, fname)
if not os.path.exists(dst):
shutil.move(src, dst)
else:
os.remove(src)
migrated_anything = True
if migrated_anything:
# Clean up old directories if empty
for d in (old_snapshots, old_profiles, _OLD_DATA_DIR):
try:
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
except OSError:
pass
print("[Snapshot Manager] Migrated data to", _USER_SM_DIR)
try:
_migrate_old_data()
except Exception as e:
print(f"[Snapshot Manager] Migration failed, old data preserved: {e}")