0d1415fca4
Comprehensive audit pass across the JS frontend and Python backend. Bugs / correctness: - Swap & restore now pre-save current state (hash-deduped) so unsaved edits aren't lost when swapping/restoring, incl. rapid double-swap - Unify captureSnapshot/captureNodeSnapshot into _captureCore; node captures now update the dedup hash (no duplicate auto-snapshot after) - Cycle guard in getDisplayPath; Ctrl+S ignores text fields and the other-workflow view; tolerant API error parsing; prompt default pre-fill Security / robustness (backend): - Validate workflowKey against path traversal (reject ./.. + containment) - Generic 500 messages (no exception-string leak), logged server-side - Request body-size cap + migrate record cap - Atomic writes (temp file + os.replace) on all write paths Performance / memory: - /list omits base64 thumbnails (hasThumbnail flag, lazy-loaded client-side) - LRU-bounded previous-graph cache; persistent (prune+LRU) SVG cache - Incremental in-place updates for lock/note instead of full list rebuild UX / docs: - Busy-op feedback, named-delete confirm, relative timestamps - README: remove disabled branching feature, fix version badge & storage paths Features: - Export / Import snapshots (export route + reuse migrate) - Storage-usage display (usage route + footer label) - Pause auto-capture toggle - Age-based retention (maxAgeDays setting + prune param) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
532 lines
19 KiB
Python
532 lines
19 KiB
Python
"""
|
|
Filesystem storage layer for workflow snapshots.
|
|
|
|
Stores each snapshot as an individual JSON file under:
|
|
<user_dir>/snapshot_manager/snapshots/<encoded_workflow_key>/<id>.json
|
|
|
|
Workflow keys are percent-encoded for filesystem safety.
|
|
|
|
An in-memory metadata cache avoids redundant disk reads for list/prune/delete
|
|
operations. Only get_full_record() reads a file from disk after warm-up.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
import urllib.parse
|
|
|
|
# ─── Data directory resolution ───────────────────────────────────────
|
|
# Prefer ComfyUI's persistent user directory; fall back to extension-local
|
|
# paths when running outside ComfyUI (e.g. tests).
|
|
|
|
_OLD_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
|
|
|
|
try:
|
|
import folder_paths
|
|
_USER_SM_DIR = os.path.join(folder_paths.get_user_directory(), "snapshot_manager")
|
|
except Exception:
|
|
_USER_SM_DIR = os.path.join(os.path.dirname(__file__), "data")
|
|
|
|
_DATA_DIR = os.path.join(_USER_SM_DIR, "snapshots")
|
|
|
|
# ─── In-memory metadata cache ────────────────────────────────────────
|
|
# Maps workflow_key -> list of metadata dicts (sorted by timestamp asc).
|
|
# Metadata is everything *except* graphData.
|
|
_cache = {}
|
|
_cache_warmed = set() # workflow keys already loaded from disk
|
|
|
|
|
|
def _extract_meta(record):
|
|
"""Return a lightweight copy of *record* without graphData or thumbnail.
|
|
|
|
The (potentially large, base64) thumbnail is replaced by a boolean
|
|
``hasThumbnail`` flag; clients lazy-load the image via get_full_record.
|
|
"""
|
|
meta = {k: v for k, v in record.items() if k not in ("graphData", "thumbnail")}
|
|
if record.get("thumbnail"):
|
|
meta["hasThumbnail"] = True
|
|
return meta
|
|
|
|
|
|
def _ensure_cached(workflow_key):
|
|
"""Warm the cache for *workflow_key* if not already loaded. Return cached list."""
|
|
if workflow_key not in _cache_warmed:
|
|
d = _workflow_dir(workflow_key)
|
|
entries = []
|
|
if os.path.isdir(d):
|
|
for fname in os.listdir(d):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
path = os.path.join(d, fname)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
entries.append(_extract_meta(json.load(f)))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
entries.sort(key=lambda r: r.get("timestamp", 0))
|
|
_cache[workflow_key] = entries
|
|
_cache_warmed.add(workflow_key)
|
|
return _cache.get(workflow_key, [])
|
|
|
|
|
|
# ─── Helpers ─────────────────────────────────────────────────────────
|
|
|
|
def _workflow_dir(workflow_key):
|
|
if not workflow_key or not isinstance(workflow_key, str):
|
|
raise ValueError(f"Invalid workflow key: {workflow_key!r}")
|
|
encoded = urllib.parse.quote(workflow_key, safe="")
|
|
path = os.path.normpath(os.path.join(_DATA_DIR, encoded))
|
|
# Defense in depth: urllib.parse.quote() leaves "." and ".." unescaped, so a
|
|
# key like ".." would escape the snapshots root (the "/" in "../.." *is*
|
|
# escaped, so escapes are bounded to one level — but block it anyway).
|
|
# Require the resolved directory to be a direct child of _DATA_DIR.
|
|
if os.path.dirname(path) != os.path.normpath(_DATA_DIR):
|
|
raise ValueError(f"Invalid workflow key: {workflow_key!r}")
|
|
return path
|
|
|
|
|
|
def _validate_id(snapshot_id):
|
|
if not snapshot_id or "/" in snapshot_id or "\\" in snapshot_id or ".." in snapshot_id:
|
|
raise ValueError(f"Invalid snapshot id: {snapshot_id!r}")
|
|
|
|
|
|
def _atomic_write_json(path, obj):
|
|
"""Write *obj* as JSON to *path* atomically (temp file + os.replace).
|
|
|
|
Prevents a crash or concurrent reader mid-write from observing a
|
|
truncated/corrupt file (the old in-place open("w") truncated first).
|
|
"""
|
|
directory = os.path.dirname(path)
|
|
fd, tmp = tempfile.mkstemp(dir=directory, suffix=".tmp")
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
|
json.dump(obj, f, separators=(",", ":"))
|
|
os.replace(tmp, path)
|
|
except BaseException:
|
|
try:
|
|
os.remove(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
# ─── Public API ──────────────────────────────────────────────────────
|
|
|
|
def put(record):
|
|
"""Write one snapshot record to disk and update the cache."""
|
|
snapshot_id = record["id"]
|
|
workflow_key = record["workflowKey"]
|
|
_validate_id(snapshot_id)
|
|
d = _workflow_dir(workflow_key)
|
|
os.makedirs(d, exist_ok=True)
|
|
path = os.path.join(d, f"{snapshot_id}.json")
|
|
_atomic_write_json(path, record)
|
|
|
|
# Update cache only if already warmed; otherwise _ensure_cached will
|
|
# pick up the new file from disk on next read.
|
|
if workflow_key in _cache_warmed:
|
|
meta = _extract_meta(record)
|
|
cached = _cache[workflow_key]
|
|
cached[:] = [e for e in cached if e.get("id") != snapshot_id]
|
|
cached.append(meta)
|
|
cached.sort(key=lambda r: r.get("timestamp", 0))
|
|
|
|
|
|
def get_all_for_workflow(workflow_key):
|
|
"""Return all snapshot metadata for a workflow (no graphData), sorted ascending by timestamp."""
|
|
return [dict(e) for e in _ensure_cached(workflow_key)]
|
|
|
|
|
|
def get_full_record(workflow_key, snapshot_id):
|
|
"""Read a single snapshot file from disk (with graphData). Returns dict or None."""
|
|
_validate_id(snapshot_id)
|
|
path = os.path.join(_workflow_dir(workflow_key), f"{snapshot_id}.json")
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
|
|
def update_meta(workflow_key, snapshot_id, fields):
|
|
"""Merge *fields* into an existing snapshot on disk without touching graphData.
|
|
|
|
Returns True on success, False if the file does not exist.
|
|
"""
|
|
_validate_id(snapshot_id)
|
|
path = os.path.join(_workflow_dir(workflow_key), f"{snapshot_id}.json")
|
|
if not os.path.isfile(path):
|
|
return False
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
record = json.load(f)
|
|
# Merge fields; None values remove the key
|
|
for k, v in fields.items():
|
|
if v is None:
|
|
record.pop(k, None)
|
|
else:
|
|
record[k] = v
|
|
_atomic_write_json(path, record)
|
|
# Update cache entry
|
|
for entry in _cache.get(workflow_key, []):
|
|
if entry.get("id") == snapshot_id:
|
|
for k, v in fields.items():
|
|
if k == "graphData":
|
|
continue
|
|
if v is None:
|
|
entry.pop(k, None)
|
|
else:
|
|
entry[k] = v
|
|
break
|
|
return True
|
|
|
|
|
|
def delete(workflow_key, snapshot_id):
|
|
"""Remove one snapshot file and its cache entry. Cleans up empty workflow dir."""
|
|
_validate_id(snapshot_id)
|
|
d = _workflow_dir(workflow_key)
|
|
path = os.path.join(d, f"{snapshot_id}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
|
|
# Update cache
|
|
if workflow_key in _cache:
|
|
_cache[workflow_key] = [e for e in _cache[workflow_key] if e.get("id") != snapshot_id]
|
|
if not _cache[workflow_key]:
|
|
del _cache[workflow_key]
|
|
_cache_warmed.discard(workflow_key)
|
|
|
|
# Clean up empty directory
|
|
if os.path.isdir(d) and not os.listdir(d):
|
|
os.rmdir(d)
|
|
|
|
|
|
def delete_all_for_workflow(workflow_key):
|
|
"""Delete all unlocked snapshots for a workflow. Returns {lockedCount}."""
|
|
entries = _ensure_cached(workflow_key)
|
|
locked = []
|
|
locked_count = 0
|
|
d = _workflow_dir(workflow_key)
|
|
for rec in entries:
|
|
if rec.get("locked"):
|
|
locked_count += 1
|
|
locked.append(rec)
|
|
else:
|
|
_validate_id(rec["id"])
|
|
path = os.path.join(d, f"{rec['id']}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
|
|
# Update cache to locked-only
|
|
if locked:
|
|
_cache[workflow_key] = locked
|
|
else:
|
|
_cache.pop(workflow_key, None)
|
|
_cache_warmed.discard(workflow_key)
|
|
|
|
# Clean up empty directory
|
|
if os.path.isdir(d) and not os.listdir(d):
|
|
os.rmdir(d)
|
|
return {"lockedCount": locked_count}
|
|
|
|
|
|
def get_all_workflow_keys():
|
|
"""Scan subdirs and return [{workflowKey, count}]."""
|
|
if not os.path.isdir(_DATA_DIR):
|
|
return []
|
|
results = []
|
|
for encoded_name in os.listdir(_DATA_DIR):
|
|
subdir = os.path.join(_DATA_DIR, encoded_name)
|
|
if not os.path.isdir(subdir):
|
|
continue
|
|
workflow_key = urllib.parse.unquote(encoded_name)
|
|
try:
|
|
entries = _ensure_cached(workflow_key)
|
|
except ValueError:
|
|
continue # skip stray/legacy dirs whose name is not a valid key
|
|
if not entries:
|
|
continue
|
|
results.append({"workflowKey": workflow_key, "count": len(entries)})
|
|
results.sort(key=lambda r: r["workflowKey"])
|
|
return results
|
|
|
|
|
|
def get_storage_usage():
|
|
"""Return {totalBytes, workflows: [{workflowKey, bytes, count}]} for all snapshots."""
|
|
workflows = []
|
|
total = 0
|
|
if os.path.isdir(_DATA_DIR):
|
|
for encoded_name in os.listdir(_DATA_DIR):
|
|
subdir = os.path.join(_DATA_DIR, encoded_name)
|
|
if not os.path.isdir(subdir):
|
|
continue
|
|
size = 0
|
|
count = 0
|
|
for fname in os.listdir(subdir):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
try:
|
|
size += os.path.getsize(os.path.join(subdir, fname))
|
|
count += 1
|
|
except OSError:
|
|
continue
|
|
if count == 0:
|
|
continue
|
|
total += size
|
|
workflows.append({
|
|
"workflowKey": urllib.parse.unquote(encoded_name),
|
|
"bytes": size,
|
|
"count": count,
|
|
})
|
|
workflows.sort(key=lambda w: w["bytes"], reverse=True)
|
|
return {"totalBytes": total, "workflows": workflows}
|
|
|
|
|
|
def get_full_records_for_workflow(workflow_key):
|
|
"""Return all full snapshot records (with graphData) for a workflow, for export."""
|
|
d = _workflow_dir(workflow_key)
|
|
records = []
|
|
if os.path.isdir(d):
|
|
for fname in os.listdir(d):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
try:
|
|
with open(os.path.join(d, fname), "r", encoding="utf-8") as f:
|
|
records.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
records.sort(key=lambda r: r.get("timestamp", 0))
|
|
return records
|
|
|
|
|
|
def prune(workflow_key, max_snapshots, source=None, protected_ids=None, max_age_days=None):
|
|
"""Delete oldest unlocked snapshots beyond limit. Returns count deleted.
|
|
|
|
source filtering:
|
|
- "node": only prune records where source == "node"
|
|
- "regular": only prune records where source is absent or not "node"
|
|
- None: prune all unlocked (existing behavior)
|
|
|
|
protected_ids: set/list of snapshot IDs that must not be pruned
|
|
(e.g. ancestors of active branch tip, fork-point snapshots).
|
|
|
|
max_age_days: when > 0, also delete unlocked/unprotected snapshots older
|
|
than this many days, regardless of the count limit.
|
|
"""
|
|
_protected = set(protected_ids) if protected_ids else set()
|
|
entries = _ensure_cached(workflow_key)
|
|
if source == "node":
|
|
candidates = [r for r in entries if not r.get("locked") and r.get("source") == "node" and r.get("id") not in _protected]
|
|
elif source == "regular":
|
|
candidates = [r for r in entries if not r.get("locked") and r.get("source") != "node" and r.get("id") not in _protected]
|
|
else:
|
|
candidates = [r for r in entries if not r.get("locked") and r.get("id") not in _protected]
|
|
delete_ids = set()
|
|
to_delete = []
|
|
# Oldest-beyond-count get deleted...
|
|
if len(candidates) > max_snapshots:
|
|
for rec in candidates[: len(candidates) - max_snapshots]:
|
|
to_delete.append(rec)
|
|
delete_ids.add(rec["id"])
|
|
# ...as do any candidates older than the age cutoff (locked/protected
|
|
# snapshots were already excluded from candidates above).
|
|
if max_age_days and max_age_days > 0:
|
|
cutoff = time.time() * 1000 - max_age_days * 86400000
|
|
for rec in candidates:
|
|
if rec["id"] not in delete_ids and rec.get("timestamp", 0) < cutoff:
|
|
to_delete.append(rec)
|
|
delete_ids.add(rec["id"])
|
|
if not to_delete:
|
|
return 0
|
|
d = _workflow_dir(workflow_key)
|
|
deleted = 0
|
|
delete_ids = set()
|
|
for rec in to_delete:
|
|
_validate_id(rec["id"])
|
|
path = os.path.join(d, f"{rec['id']}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
deleted += 1
|
|
delete_ids.add(rec["id"])
|
|
|
|
# Update cache
|
|
if delete_ids and workflow_key in _cache:
|
|
_cache[workflow_key] = [e for e in _cache[workflow_key] if e.get("id") not in delete_ids]
|
|
if not _cache[workflow_key]:
|
|
del _cache[workflow_key]
|
|
_cache_warmed.discard(workflow_key)
|
|
|
|
# Clean up empty directory
|
|
if os.path.isdir(d) and not os.listdir(d):
|
|
os.rmdir(d)
|
|
|
|
return deleted
|
|
|
|
|
|
# ─── Profile Storage ─────────────────────────────────────────────────
|
|
# Profiles are stored as individual JSON files under snapshot_manager/profiles/<id>.json
|
|
|
|
_PROFILES_DIR = os.path.join(_USER_SM_DIR, "profiles")
|
|
_profile_cache = None # list of profile dicts, or None if not loaded
|
|
|
|
|
|
def _ensure_profiles_dir():
|
|
os.makedirs(_PROFILES_DIR, exist_ok=True)
|
|
|
|
|
|
def _load_profile_cache():
|
|
global _profile_cache
|
|
if _profile_cache is not None:
|
|
return _profile_cache
|
|
_ensure_profiles_dir()
|
|
profiles = []
|
|
for fname in os.listdir(_PROFILES_DIR):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
path = os.path.join(_PROFILES_DIR, fname)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
profiles.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
profiles.sort(key=lambda p: p.get("timestamp", 0))
|
|
_profile_cache = profiles
|
|
return _profile_cache
|
|
|
|
|
|
def _invalidate_profile_cache():
|
|
global _profile_cache
|
|
_profile_cache = None
|
|
|
|
|
|
def profile_put(profile):
|
|
"""Create or update a profile. profile must have 'id'."""
|
|
pid = profile["id"]
|
|
_validate_id(pid)
|
|
_ensure_profiles_dir()
|
|
path = os.path.join(_PROFILES_DIR, f"{pid}.json")
|
|
_atomic_write_json(path, profile)
|
|
_invalidate_profile_cache()
|
|
|
|
|
|
def profile_get_all():
|
|
"""Return all profiles sorted by timestamp."""
|
|
return [dict(p) for p in _load_profile_cache()]
|
|
|
|
|
|
def profile_get(profile_id):
|
|
"""Return a single profile by ID, or None."""
|
|
_validate_id(profile_id)
|
|
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
|
|
if not os.path.isfile(path):
|
|
return None
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
return None
|
|
|
|
|
|
def profile_delete(profile_id):
|
|
"""Delete a profile by ID."""
|
|
_validate_id(profile_id)
|
|
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
_invalidate_profile_cache()
|
|
|
|
|
|
def profile_update(profile_id, fields):
|
|
"""Merge fields into an existing profile. Returns True on success."""
|
|
_validate_id(profile_id)
|
|
path = os.path.join(_PROFILES_DIR, f"{profile_id}.json")
|
|
if not os.path.isfile(path):
|
|
return False
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
profile = json.load(f)
|
|
for k, v in fields.items():
|
|
if v is None:
|
|
profile.pop(k, None)
|
|
else:
|
|
profile[k] = v
|
|
_atomic_write_json(path, profile)
|
|
_invalidate_profile_cache()
|
|
return True
|
|
|
|
|
|
# ─── Migration from old extension-local data ─────────────────────────
|
|
|
|
def _migrate_old_data():
|
|
"""Move data from the old <extension>/data/ location to the new user directory.
|
|
|
|
Only runs when the old directory exists, has content, and the new location
|
|
differs from the old one (i.e. we're actually inside ComfyUI).
|
|
"""
|
|
old_snapshots = os.path.join(_OLD_DATA_DIR, "snapshots")
|
|
old_profiles = os.path.join(_OLD_DATA_DIR, "profiles")
|
|
|
|
# Nothing to migrate if old data dir doesn't exist or paths are the same
|
|
if os.path.normpath(_OLD_DATA_DIR) == os.path.normpath(_USER_SM_DIR):
|
|
return
|
|
if not os.path.isdir(_OLD_DATA_DIR):
|
|
return
|
|
|
|
migrated_anything = False
|
|
|
|
# Migrate snapshot workflow directories
|
|
if os.path.isdir(old_snapshots):
|
|
os.makedirs(_DATA_DIR, exist_ok=True)
|
|
for name in os.listdir(old_snapshots):
|
|
src = os.path.join(old_snapshots, name)
|
|
dst = os.path.join(_DATA_DIR, name)
|
|
if not os.path.isdir(src):
|
|
continue
|
|
if os.path.exists(dst):
|
|
# Merge: move individual files that don't already exist
|
|
for fname in os.listdir(src):
|
|
s = os.path.join(src, fname)
|
|
d = os.path.join(dst, fname)
|
|
if not os.path.exists(d):
|
|
shutil.move(s, d)
|
|
# Remove source dir if now empty
|
|
try:
|
|
os.rmdir(src)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
shutil.move(src, dst)
|
|
migrated_anything = True
|
|
|
|
# Migrate profile files
|
|
if os.path.isdir(old_profiles):
|
|
os.makedirs(_PROFILES_DIR, exist_ok=True)
|
|
for fname in os.listdir(old_profiles):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
src = os.path.join(old_profiles, fname)
|
|
dst = os.path.join(_PROFILES_DIR, fname)
|
|
if not os.path.exists(dst):
|
|
shutil.move(src, dst)
|
|
else:
|
|
os.remove(src)
|
|
migrated_anything = True
|
|
|
|
if migrated_anything:
|
|
# Clean up old directories if empty
|
|
for d in (old_snapshots, old_profiles, _OLD_DATA_DIR):
|
|
try:
|
|
if os.path.isdir(d) and not os.listdir(d):
|
|
os.rmdir(d)
|
|
except OSError:
|
|
pass
|
|
print("[Snapshot Manager] Migrated data to", _USER_SM_DIR)
|
|
|
|
|
|
try:
|
|
_migrate_old_data()
|
|
except Exception as e:
|
|
print(f"[Snapshot Manager] Migration failed, old data preserved: {e}")
|