Files
Comfyui-Workflow-Snapshot-M…/snapshot_storage.py
T
Ethanfel 0d1415fca4 Audit fixes: data-loss, security, performance, UX + new features
Comprehensive audit pass across the JS frontend and Python backend.

Bugs / correctness:
- Swap & restore now pre-save current state (hash-deduped) so unsaved
  edits aren't lost when swapping/restoring, incl. rapid double-swap
- Unify captureSnapshot/captureNodeSnapshot into _captureCore; node
  captures now update the dedup hash (no duplicate auto-snapshot after)
- Cycle guard in getDisplayPath; Ctrl+S ignores text fields and the
  other-workflow view; tolerant API error parsing; prompt default pre-fill

Security / robustness (backend):
- Validate workflowKey against path traversal (reject ./.. + containment)
- Generic 500 messages (no exception-string leak), logged server-side
- Request body-size cap + migrate record cap
- Atomic writes (temp file + os.replace) on all write paths

Performance / memory:
- /list omits base64 thumbnails (hasThumbnail flag, lazy-loaded client-side)
- LRU-bounded previous-graph cache; persistent (prune+LRU) SVG cache
- Incremental in-place updates for lock/note instead of full list rebuild

UX / docs:
- Busy-op feedback, named-delete confirm, relative timestamps
- README: remove disabled branching feature, fix version badge & storage paths

Features:
- Export / Import snapshots (export route + reuse migrate)
- Storage-usage display (usage route + footer label)
- Pause auto-capture toggle
- Age-based retention (maxAgeDays setting + prune param)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-14 11:04:09 +02:00

532 lines
19 KiB
Python

"""
Filesystem storage layer for workflow snapshots.
Stores each snapshot as an individual JSON file under:
<user_dir>/snapshot_manager/snapshots/<encoded_workflow_key>/<id>.json
Workflow keys are percent-encoded for filesystem safety.
An in-memory metadata cache avoids redundant disk reads for list/prune/delete
operations. Only get_full_record() reads a file from disk after warm-up.
"""
import json
import os
import shutil
import tempfile
import time
import urllib.parse
# ─── Data directory resolution ───────────────────────────────────────
# Prefer ComfyUI's persistent user directory; fall back to extension-local
# paths when running outside ComfyUI (e.g. tests).
_OLD_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
try:
import folder_paths
_USER_SM_DIR = os.path.join(folder_paths.get_user_directory(), "snapshot_manager")
except Exception:
_USER_SM_DIR = os.path.join(os.path.dirname(__file__), "data")
_DATA_DIR = os.path.join(_USER_SM_DIR, "snapshots")
# ─── In-memory metadata cache ────────────────────────────────────────
# Maps workflow_key -> list of metadata dicts (sorted by timestamp asc).
# Metadata is everything *except* graphData.
_cache = {}
_cache_warmed = set() # workflow keys already loaded from disk
def _extract_meta(record):
"""Return a lightweight copy of *record* without graphData or thumbnail.
The (potentially large, base64) thumbnail is replaced by a boolean
``hasThumbnail`` flag; clients lazy-load the image via get_full_record.
"""
meta = {k: v for k, v in record.items() if k not in ("graphData", "thumbnail")}
if record.get("thumbnail"):
meta["hasThumbnail"] = True
return meta
def _ensure_cached(workflow_key):
"""Warm the cache for *workflow_key* if not already loaded. Return cached list."""
if workflow_key not in _cache_warmed:
d = _workflow_dir(workflow_key)
entries = []
if os.path.isdir(d):
for fname in os.listdir(d):
if not fname.endswith(".json"):
continue
path = os.path.join(d, fname)
try:
with open(path, "r", encoding="utf-8") as f:
entries.append(_extract_meta(json.load(f)))
except (json.JSONDecodeError, OSError):
continue
entries.sort(key=lambda r: r.get("timestamp", 0))
_cache[workflow_key] = entries
_cache_warmed.add(workflow_key)
return _cache.get(workflow_key, [])
# ─── Helpers ─────────────────────────────────────────────────────────
def _workflow_dir(workflow_key):
if not workflow_key or not isinstance(workflow_key, str):
raise ValueError(f"Invalid workflow key: {workflow_key!r}")
encoded = urllib.parse.quote(workflow_key, safe="")
path = os.path.normpath(os.path.join(_DATA_DIR, encoded))
# Defense in depth: urllib.parse.quote() leaves "." and ".." unescaped, so a
# key like ".." would escape the snapshots root (the "/" in "../.." *is*
# escaped, so escapes are bounded to one level — but block it anyway).
# Require the resolved directory to be a direct child of _DATA_DIR.
if os.path.dirname(path) != os.path.normpath(_DATA_DIR):
raise ValueError(f"Invalid workflow key: {workflow_key!r}")
return path
def _validate_id(snapshot_id):
if not snapshot_id or "/" in snapshot_id or "\\" in snapshot_id or ".." in snapshot_id:
raise ValueError(f"Invalid snapshot id: {snapshot_id!r}")
def _atomic_write_json(path, obj):
"""Write *obj* as JSON to *path* atomically (temp file + os.replace).
Prevents a crash or concurrent reader mid-write from observing a
truncated/corrupt file (the old in-place open("w") truncated first).
"""
directory = os.path.dirname(path)
fd, tmp = tempfile.mkstemp(dir=directory, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(obj, f, separators=(",", ":"))
os.replace(tmp, path)
except BaseException:
try:
os.remove(tmp)
except OSError:
pass
raise
# ─── Public API ──────────────────────────────────────────────────────
def put(record):
"""Write one snapshot record to disk and update the cache."""
snapshot_id = record["id"]
workflow_key = record["workflowKey"]
_validate_id(snapshot_id)
d = _workflow_dir(workflow_key)
os.makedirs(d, exist_ok=True)
path = os.path.join(d, f"{snapshot_id}.json")
_atomic_write_json(path, record)
# Update cache only if already warmed; otherwise _ensure_cached will
# pick up the new file from disk on next read.
if workflow_key in _cache_warmed:
meta = _extract_meta(record)
cached = _cache[workflow_key]
cached[:] = [e for e in cached if e.get("id") != snapshot_id]
cached.append(meta)
cached.sort(key=lambda r: r.get("timestamp", 0))
def get_all_for_workflow(workflow_key):
"""Return all snapshot metadata for a workflow (no graphData), sorted ascending by timestamp."""
return [dict(e) for e in _ensure_cached(workflow_key)]
def get_full_record(workflow_key, snapshot_id):
"""Read a single snapshot file from disk (with graphData). Returns dict or None."""
_validate_id(snapshot_id)
path = os.path.join(_workflow_dir(workflow_key), f"{snapshot_id}.json")
if not os.path.isfile(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def update_meta(workflow_key, snapshot_id, fields):
"""Merge *fields* into an existing snapshot on disk without touching graphData.
Returns True on success, False if the file does not exist.
"""
_validate_id(snapshot_id)
path = os.path.join(_workflow_dir(workflow_key), f"{snapshot_id}.json")
if not os.path.isfile(path):
return False
with open(path, "r", encoding="utf-8") as f:
record = json.load(f)
# Merge fields; None values remove the key
for k, v in fields.items():
if v is None:
record.pop(k, None)
else:
record[k] = v
_atomic_write_json(path, record)
# Update cache entry
for entry in _cache.get(workflow_key, []):
if entry.get("id") == snapshot_id:
for k, v in fields.items():
if k == "graphData":
continue
if v is None:
entry.pop(k, None)
else:
entry[k] = v
break
return True
def delete(workflow_key, snapshot_id):
"""Remove one snapshot file and its cache entry. Cleans up empty workflow dir."""
_validate_id(snapshot_id)
d = _workflow_dir(workflow_key)
path = os.path.join(d, f"{snapshot_id}.json")
if os.path.isfile(path):
os.remove(path)
# Update cache
if workflow_key in _cache:
_cache[workflow_key] = [e for e in _cache[workflow_key] if e.get("id") != snapshot_id]
if not _cache[workflow_key]:
del _cache[workflow_key]
_cache_warmed.discard(workflow_key)
# Clean up empty directory
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
def delete_all_for_workflow(workflow_key):
"""Delete all unlocked snapshots for a workflow. Returns {lockedCount}."""
entries = _ensure_cached(workflow_key)
locked = []
locked_count = 0
d = _workflow_dir(workflow_key)
for rec in entries:
if rec.get("locked"):
locked_count += 1
locked.append(rec)
else:
_validate_id(rec["id"])
path = os.path.join(d, f"{rec['id']}.json")
if os.path.isfile(path):
os.remove(path)
# Update cache to locked-only
if locked:
_cache[workflow_key] = locked
else:
_cache.pop(workflow_key, None)
_cache_warmed.discard(workflow_key)
# Clean up empty directory
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
return {"lockedCount": locked_count}
def get_all_workflow_keys():
"""Scan subdirs and return [{workflowKey, count}]."""
if not os.path.isdir(_DATA_DIR):
return []
results = []
for encoded_name in os.listdir(_DATA_DIR):
subdir = os.path.join(_DATA_DIR, encoded_name)
if not os.path.isdir(subdir):
continue
workflow_key = urllib.parse.unquote(encoded_name)
try:
entries = _ensure_cached(workflow_key)
except ValueError:
continue # skip stray/legacy dirs whose name is not a valid key
if not entries:
continue
results.append({"workflowKey": workflow_key, "count": len(entries)})
results.sort(key=lambda r: r["workflowKey"])
return results
def get_storage_usage():
"""Return {totalBytes, workflows: [{workflowKey, bytes, count}]} for all snapshots."""
workflows = []
total = 0
if os.path.isdir(_DATA_DIR):
for encoded_name in os.listdir(_DATA_DIR):
subdir = os.path.join(_DATA_DIR, encoded_name)
if not os.path.isdir(subdir):
continue
size = 0
count = 0
for fname in os.listdir(subdir):
if not fname.endswith(".json"):
continue
try:
size += os.path.getsize(os.path.join(subdir, fname))
count += 1
except OSError:
continue
if count == 0:
continue
total += size
workflows.append({
"workflowKey": urllib.parse.unquote(encoded_name),
"bytes": size,
"count": count,
})
workflows.sort(key=lambda w: w["bytes"], reverse=True)
return {"totalBytes": total, "workflows": workflows}
def get_full_records_for_workflow(workflow_key):
"""Return all full snapshot records (with graphData) for a workflow, for export."""
d = _workflow_dir(workflow_key)
records = []
if os.path.isdir(d):
for fname in os.listdir(d):
if not fname.endswith(".json"):
continue
try:
with open(os.path.join(d, fname), "r", encoding="utf-8") as f:
records.append(json.load(f))
except (json.JSONDecodeError, OSError):
continue
records.sort(key=lambda r: r.get("timestamp", 0))
return records
def prune(workflow_key, max_snapshots, source=None, protected_ids=None, max_age_days=None):
"""Delete oldest unlocked snapshots beyond limit. Returns count deleted.
source filtering:
- "node": only prune records where source == "node"
- "regular": only prune records where source is absent or not "node"
- None: prune all unlocked (existing behavior)
protected_ids: set/list of snapshot IDs that must not be pruned
(e.g. ancestors of active branch tip, fork-point snapshots).
max_age_days: when > 0, also delete unlocked/unprotected snapshots older
than this many days, regardless of the count limit.
"""
_protected = set(protected_ids) if protected_ids else set()
entries = _ensure_cached(workflow_key)
if source == "node":
candidates = [r for r in entries if not r.get("locked") and r.get("source") == "node" and r.get("id") not in _protected]
elif source == "regular":
candidates = [r for r in entries if not r.get("locked") and r.get("source") != "node" and r.get("id") not in _protected]
else:
candidates = [r for r in entries if not r.get("locked") and r.get("id") not in _protected]
delete_ids = set()
to_delete = []
# Oldest-beyond-count get deleted...
if len(candidates) > max_snapshots:
for rec in candidates[: len(candidates) - max_snapshots]:
to_delete.append(rec)
delete_ids.add(rec["id"])
# ...as do any candidates older than the age cutoff (locked/protected
# snapshots were already excluded from candidates above).
if max_age_days and max_age_days > 0:
cutoff = time.time() * 1000 - max_age_days * 86400000
for rec in candidates:
if rec["id"] not in delete_ids and rec.get("timestamp", 0) < cutoff:
to_delete.append(rec)
delete_ids.add(rec["id"])
if not to_delete:
return 0
d = _workflow_dir(workflow_key)
deleted = 0
delete_ids = set()
for rec in to_delete:
_validate_id(rec["id"])
path = os.path.join(d, f"{rec['id']}.json")
if os.path.isfile(path):
os.remove(path)
deleted += 1
delete_ids.add(rec["id"])
# Update cache
if delete_ids and workflow_key in _cache:
_cache[workflow_key] = [e for e in _cache[workflow_key] if e.get("id") not in delete_ids]
if not _cache[workflow_key]:
del _cache[workflow_key]
_cache_warmed.discard(workflow_key)
# Clean up empty directory
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
return deleted
# ─── Profile Storage ─────────────────────────────────────────────────
# Profiles are stored as individual JSON files under snapshot_manager/profiles/<id>.json
_PROFILES_DIR = os.path.join(_USER_SM_DIR, "profiles")
_profile_cache = None # list of profile dicts, or None if not loaded
def _ensure_profiles_dir():
os.makedirs(_PROFILES_DIR, exist_ok=True)
def _load_profile_cache():
global _profile_cache
if _profile_cache is not None:
return _profile_cache
_ensure_profiles_dir()
profiles = []
for fname in os.listdir(_PROFILES_DIR):
if not fname.endswith(".json"):
continue
path = os.path.join(_PROFILES_DIR, fname)
try:
with open(path, "r", encoding="utf-8") as f:
profiles.append(json.load(f))
except (json.JSONDecodeError, OSError):
continue
profiles.sort(key=lambda p: p.get("timestamp", 0))
_profile_cache = profiles
return _profile_cache
def _invalidate_profile_cache():
global _profile_cache
_profile_cache = None
def profile_put(profile):
"""Create or update a profile. profile must have 'id'."""
pid = profile["id"]
_validate_id(pid)
_ensure_profiles_dir()
path = os.path.join(_PROFILES_DIR, f"{pid}.json")
_atomic_write_json(path, profile)
_invalidate_profile_cache()
def profile_get_all():
"""Return all profiles sorted by timestamp."""
return [dict(p) for p in _load_profile_cache()]
def profile_get(profile_id):
"""Return a single profile by ID, or None."""
_validate_id(profile_id)
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
if not os.path.isfile(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def profile_delete(profile_id):
"""Delete a profile by ID."""
_validate_id(profile_id)
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
if os.path.isfile(path):
os.remove(path)
_invalidate_profile_cache()
def profile_update(profile_id, fields):
"""Merge fields into an existing profile. Returns True on success."""
_validate_id(profile_id)
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
if not os.path.isfile(path):
return False
with open(path, "r", encoding="utf-8") as f:
profile = json.load(f)
for k, v in fields.items():
if v is None:
profile.pop(k, None)
else:
profile[k] = v
_atomic_write_json(path, profile)
_invalidate_profile_cache()
return True
# ─── Migration from old extension-local data ─────────────────────────
def _migrate_old_data():
"""Move data from the old <extension>/data/ location to the new user directory.
Only runs when the old directory exists, has content, and the new location
differs from the old one (i.e. we're actually inside ComfyUI).
"""
old_snapshots = os.path.join(_OLD_DATA_DIR, "snapshots")
old_profiles = os.path.join(_OLD_DATA_DIR, "profiles")
# Nothing to migrate if old data dir doesn't exist or paths are the same
if os.path.normpath(_OLD_DATA_DIR) == os.path.normpath(_USER_SM_DIR):
return
if not os.path.isdir(_OLD_DATA_DIR):
return
migrated_anything = False
# Migrate snapshot workflow directories
if os.path.isdir(old_snapshots):
os.makedirs(_DATA_DIR, exist_ok=True)
for name in os.listdir(old_snapshots):
src = os.path.join(old_snapshots, name)
dst = os.path.join(_DATA_DIR, name)
if not os.path.isdir(src):
continue
if os.path.exists(dst):
# Merge: move individual files that don't already exist
for fname in os.listdir(src):
s = os.path.join(src, fname)
d = os.path.join(dst, fname)
if not os.path.exists(d):
shutil.move(s, d)
# Remove source dir if now empty
try:
os.rmdir(src)
except OSError:
pass
else:
shutil.move(src, dst)
migrated_anything = True
# Migrate profile files
if os.path.isdir(old_profiles):
os.makedirs(_PROFILES_DIR, exist_ok=True)
for fname in os.listdir(old_profiles):
if not fname.endswith(".json"):
continue
src = os.path.join(old_profiles, fname)
dst = os.path.join(_PROFILES_DIR, fname)
if not os.path.exists(dst):
shutil.move(src, dst)
else:
os.remove(src)
migrated_anything = True
if migrated_anything:
# Clean up old directories if empty
for d in (old_snapshots, old_profiles, _OLD_DATA_DIR):
try:
if os.path.isdir(d) and not os.listdir(d):
os.rmdir(d)
except OSError:
pass
print("[Snapshot Manager] Migrated data to", _USER_SM_DIR)
try:
_migrate_old_data()
except Exception as e:
print(f"[Snapshot Manager] Migration failed, old data preserved: {e}")