Files
Ethanfel 1ec3abf17a feat: replace Git-DAG timeline with flat snapshot browser
Replace HistoryTree (DAG with branches, Graphviz rendering) with a flat
chronological SnapshotTimeline. New UI features: split-view layout,
snapshot compare/diff, cherry-pick restore of individual sequences or
fields, auto-snapshots with debounce, and pin/filter support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 18:14:22 +01:00

185 lines
6.7 KiB
Python

import time
import uuid
from typing import Any
KEY_PROMPT_HISTORY = "prompt_history"
class SnapshotTimeline:
"""Flat chronological snapshot list — replaces the old HistoryTree DAG."""
def __init__(self, raw_data: dict[str, Any]) -> None:
# Detect and migrate old HistoryTree format
if "nodes" in raw_data and "branches" in raw_data:
self._migrate_from_tree(raw_data)
elif KEY_PROMPT_HISTORY in raw_data and isinstance(raw_data[KEY_PROMPT_HISTORY], list):
self._migrate_legacy(raw_data[KEY_PROMPT_HISTORY])
else:
self.snapshots: dict[str, dict[str, Any]] = raw_data.get("snapshots", {})
self.current_id: str | None = raw_data.get("current_id", None)
# ------------------------------------------------------------------
# Migration
# ------------------------------------------------------------------
def _migrate_from_tree(self, raw_data: dict[str, Any]) -> None:
"""Flatten old HistoryTree nodes into snapshot list, discarding DAG info."""
self.snapshots = {}
nodes = raw_data.get("nodes", {})
for nid, node in nodes.items():
self.snapshots[nid] = {
"id": nid,
"timestamp": node.get("timestamp", time.time()),
"note": node.get("note", "Migrated"),
"pinned": False,
"auto": False,
"seq_count": self._count_seqs(node.get("data")),
}
# Preserve snapshot data if present
if "data" in node and node["data"]:
self.snapshots[nid]["data"] = node["data"]
self.current_id = raw_data.get("head_id")
def _migrate_legacy(self, old_list: list[dict[str, Any]]) -> None:
"""Convert ancient prompt_history list into snapshots."""
self.snapshots = {}
self.current_id = None
for item in reversed(old_list):
sid = self._make_id()
self.snapshots[sid] = {
"id": sid,
"timestamp": time.time(),
"note": item.get("note", "Legacy Import"),
"pinned": False,
"auto": False,
"seq_count": self._count_seqs(item),
"data": item,
}
self.current_id = sid
# ------------------------------------------------------------------
# Core operations
# ------------------------------------------------------------------
def record(self, data: dict[str, Any], note: str = "Snapshot",
auto: bool = False) -> str:
"""Create a new snapshot and return its ID."""
sid = self._make_id()
self.snapshots[sid] = {
"id": sid,
"timestamp": time.time(),
"note": note,
"pinned": False,
"auto": auto,
"seq_count": self._count_seqs(data),
"data": data,
}
self.current_id = sid
return sid
def get_snapshot_data(self, snapshot_id: str) -> dict[str, Any] | None:
"""Return the inline snapshot data if present."""
snap = self.snapshots.get(snapshot_id)
if snap:
return snap.get("data")
return None
def toggle_pin(self, snapshot_id: str) -> bool:
"""Toggle pinned state, return new value."""
snap = self.snapshots.get(snapshot_id)
if snap:
snap["pinned"] = not snap.get("pinned", False)
return snap["pinned"]
return False
def delete(self, snapshot_id: str) -> None:
"""Remove a snapshot."""
self.snapshots.pop(snapshot_id, None)
if self.current_id == snapshot_id:
# Fall back to most recent remaining
if self.snapshots:
self.current_id = max(
self.snapshots.values(), key=lambda s: s["timestamp"]
)["id"]
else:
self.current_id = None
def strip_snapshots(self) -> None:
"""Remove inline data from all snapshots (for slim JSON storage)."""
for snap in self.snapshots.values():
snap.pop("data", None)
# ------------------------------------------------------------------
# Serialization
# ------------------------------------------------------------------
def to_dict(self) -> dict[str, Any]:
return {
"snapshots": self.snapshots,
"current_id": self.current_id,
}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _make_id(self) -> str:
for _ in range(10):
sid = str(uuid.uuid4())[:8]
if sid not in self.snapshots:
return sid
raise ValueError("Failed to generate unique snapshot ID after 10 attempts")
@staticmethod
def _count_seqs(data: dict | None) -> int:
if not data:
return 0
from utils import KEY_BATCH_DATA
batch = data.get(KEY_BATCH_DATA, [])
return len(batch) if isinstance(batch, list) else 0
# ------------------------------------------------------------------
# Diff function
# ------------------------------------------------------------------
def diff_snapshots(old_batch: list[dict], new_batch: list[dict]) -> list[dict]:
"""Compare two batch lists by sequence_number, return per-sequence diffs.
Returns a list of dicts:
{
"seq_num": int,
"status": "unchanged" | "changed" | "added" | "removed",
"changes": [{"field": str, "old": Any, "new": Any}],
}
"""
from utils import KEY_SEQUENCE_NUMBER
old_by_seq = {int(s.get(KEY_SEQUENCE_NUMBER, 0)): s for s in old_batch}
new_by_seq = {int(s.get(KEY_SEQUENCE_NUMBER, 0)): s for s in new_batch}
all_seqs = sorted(set(old_by_seq) | set(new_by_seq))
result = []
for seq_num in all_seqs:
old_item = old_by_seq.get(seq_num)
new_item = new_by_seq.get(seq_num)
if old_item and not new_item:
result.append({"seq_num": seq_num, "status": "removed", "changes": []})
elif new_item and not old_item:
result.append({"seq_num": seq_num, "status": "added", "changes": []})
else:
# Both exist — field-by-field comparison
all_keys = sorted(set(old_item) | set(new_item))
changes = []
for k in all_keys:
old_val = old_item.get(k)
new_val = new_item.get(k)
if old_val != new_val:
changes.append({"field": k, "old": old_val, "new": new_val})
status = "changed" if changes else "unchanged"
result.append({"seq_num": seq_num, "status": status, "changes": changes})
return result