Introduce a SaveSnapshot custom node that triggers snapshot captures via WebSocket. Node-triggered snapshots are visually distinct in the sidebar (purple left border + "Node" badge) and managed with their own independent rolling limit (maxNodeSnapshots setting), separate from auto/manual snapshot pruning. Node snapshots skip hash-dedup so repeated queue runs always capture. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
"""
|
|
Filesystem storage layer for workflow snapshots.
|
|
|
|
Stores each snapshot as an individual JSON file under:
|
|
<extension_dir>/data/snapshots/<encoded_workflow_key>/<id>.json
|
|
|
|
Workflow keys are percent-encoded for filesystem safety.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import urllib.parse
|
|
|
|
_DATA_DIR = os.path.join(os.path.dirname(__file__), "data", "snapshots")
|
|
|
|
|
|
def _workflow_dir(workflow_key):
|
|
encoded = urllib.parse.quote(workflow_key, safe="")
|
|
return os.path.join(_DATA_DIR, encoded)
|
|
|
|
|
|
def _validate_id(snapshot_id):
|
|
if not snapshot_id or "/" in snapshot_id or "\\" in snapshot_id or ".." in snapshot_id:
|
|
raise ValueError(f"Invalid snapshot id: {snapshot_id!r}")
|
|
|
|
|
|
def put(record):
|
|
"""Write one snapshot record to disk."""
|
|
snapshot_id = record["id"]
|
|
workflow_key = record["workflowKey"]
|
|
_validate_id(snapshot_id)
|
|
d = _workflow_dir(workflow_key)
|
|
os.makedirs(d, exist_ok=True)
|
|
path = os.path.join(d, f"{snapshot_id}.json")
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(record, f, separators=(",", ":"))
|
|
|
|
|
|
def get_all_for_workflow(workflow_key):
|
|
"""Return all snapshots for a workflow, sorted ascending by timestamp."""
|
|
d = _workflow_dir(workflow_key)
|
|
if not os.path.isdir(d):
|
|
return []
|
|
results = []
|
|
for fname in os.listdir(d):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
path = os.path.join(d, fname)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
results.append(json.load(f))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
results.sort(key=lambda r: r.get("timestamp", 0))
|
|
return results
|
|
|
|
|
|
def delete(workflow_key, snapshot_id):
|
|
"""Remove one snapshot file. Cleans up empty workflow dir."""
|
|
_validate_id(snapshot_id)
|
|
d = _workflow_dir(workflow_key)
|
|
path = os.path.join(d, f"{snapshot_id}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
# Clean up empty directory
|
|
if os.path.isdir(d) and not os.listdir(d):
|
|
os.rmdir(d)
|
|
|
|
|
|
def delete_all_for_workflow(workflow_key):
|
|
"""Delete all unlocked snapshots for a workflow. Returns {lockedCount}."""
|
|
records = get_all_for_workflow(workflow_key)
|
|
locked_count = 0
|
|
for rec in records:
|
|
if rec.get("locked"):
|
|
locked_count += 1
|
|
else:
|
|
_validate_id(rec["id"])
|
|
path = os.path.join(_workflow_dir(workflow_key), f"{rec['id']}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
# Clean up empty directory
|
|
d = _workflow_dir(workflow_key)
|
|
if os.path.isdir(d) and not os.listdir(d):
|
|
os.rmdir(d)
|
|
return {"lockedCount": locked_count}
|
|
|
|
|
|
def get_all_workflow_keys():
|
|
"""Scan subdirs and return [{workflowKey, count}]."""
|
|
if not os.path.isdir(_DATA_DIR):
|
|
return []
|
|
results = []
|
|
for encoded_name in os.listdir(_DATA_DIR):
|
|
subdir = os.path.join(_DATA_DIR, encoded_name)
|
|
if not os.path.isdir(subdir):
|
|
continue
|
|
count = sum(1 for f in os.listdir(subdir) if f.endswith(".json"))
|
|
if count == 0:
|
|
continue
|
|
workflow_key = urllib.parse.unquote(encoded_name)
|
|
results.append({"workflowKey": workflow_key, "count": count})
|
|
results.sort(key=lambda r: r["workflowKey"])
|
|
return results
|
|
|
|
|
|
def prune(workflow_key, max_snapshots, source=None):
|
|
"""Delete oldest unlocked snapshots beyond limit. Returns count deleted.
|
|
|
|
source filtering:
|
|
- "node": only prune records where source == "node"
|
|
- "regular": only prune records where source is absent or not "node"
|
|
- None: prune all unlocked (existing behavior)
|
|
"""
|
|
records = get_all_for_workflow(workflow_key)
|
|
if source == "node":
|
|
candidates = [r for r in records if not r.get("locked") and r.get("source") == "node"]
|
|
elif source == "regular":
|
|
candidates = [r for r in records if not r.get("locked") and r.get("source") != "node"]
|
|
else:
|
|
candidates = [r for r in records if not r.get("locked")]
|
|
if len(candidates) <= max_snapshots:
|
|
return 0
|
|
to_delete = candidates[: len(candidates) - max_snapshots]
|
|
d = _workflow_dir(workflow_key)
|
|
deleted = 0
|
|
for rec in to_delete:
|
|
_validate_id(rec["id"])
|
|
path = os.path.join(d, f"{rec['id']}.json")
|
|
if os.path.isfile(path):
|
|
os.remove(path)
|
|
deleted += 1
|
|
return deleted
|