Fix RAM leak: strip history snapshots from memory, load on demand
History tree nodes stored full data snapshots in memory (5-50MB each), accumulating with every save. Now: - New `history_snapshots` DB table stores node data separately - `save_history_tree` and `sync_to_db` extract snapshots before saving - In-memory tree nodes only hold metadata (id, parent, note, timestamp) - Restore and preview load snapshots from DB on demand - `save_and_snap` uses json roundtrip instead of deepcopy (1 copy not 2) - `_src_cache` moved to AppState, cleared on file switch - `strip_snapshots()` method on HistoryTree for explicit cleanup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -48,8 +48,18 @@ CREATE TABLE IF NOT EXISTS history_trees (
|
||||
updated_at REAL NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS history_snapshots (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
data_file_id INTEGER NOT NULL REFERENCES data_files(id) ON DELETE CASCADE,
|
||||
node_id TEXT NOT NULL,
|
||||
snapshot_data TEXT NOT NULL,
|
||||
updated_at REAL NOT NULL,
|
||||
UNIQUE(data_file_id, node_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_data_files_project_id ON data_files(project_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sequences_data_file_id ON sequences(data_file_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_history_snapshots_df ON history_snapshots(data_file_id);
|
||||
"""
|
||||
|
||||
|
||||
@@ -314,22 +324,64 @@ class ProjectDB:
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def save_history_tree(self, data_file_id: int, tree_data: dict) -> None:
|
||||
"""Save history tree, extracting node snapshots into separate table."""
|
||||
now = time.time()
|
||||
# Extract snapshot data from nodes into history_snapshots table
|
||||
nodes = tree_data.get("nodes", {})
|
||||
slim_tree = dict(tree_data)
|
||||
slim_nodes = {}
|
||||
for nid, node in nodes.items():
|
||||
snap = node.get("data")
|
||||
if snap:
|
||||
self.conn.execute(
|
||||
"INSERT INTO history_snapshots (data_file_id, node_id, snapshot_data, updated_at) "
|
||||
"VALUES (?, ?, ?, ?) "
|
||||
"ON CONFLICT(data_file_id, node_id) DO UPDATE SET "
|
||||
"snapshot_data=excluded.snapshot_data, updated_at=excluded.updated_at",
|
||||
(data_file_id, nid, json.dumps(snap), now),
|
||||
)
|
||||
# Store node without data in tree
|
||||
slim_nodes[nid] = {k: v for k, v in node.items() if k != "data"}
|
||||
slim_tree["nodes"] = slim_nodes
|
||||
self.conn.execute(
|
||||
"INSERT INTO history_trees (data_file_id, tree_data, updated_at) "
|
||||
"VALUES (?, ?, ?) "
|
||||
"ON CONFLICT(data_file_id) DO UPDATE SET tree_data=excluded.tree_data, updated_at=excluded.updated_at",
|
||||
(data_file_id, json.dumps(tree_data), now),
|
||||
(data_file_id, json.dumps(slim_tree), now),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def get_history_tree(self, data_file_id: int) -> dict | None:
|
||||
"""Load history tree metadata (without snapshot data)."""
|
||||
row = self.conn.execute(
|
||||
"SELECT tree_data FROM history_trees WHERE data_file_id = ?",
|
||||
(data_file_id,),
|
||||
).fetchone()
|
||||
return json.loads(row["tree_data"]) if row else None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# History snapshots (per-node data, loaded on demand)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_node_snapshot(self, data_file_id: int, node_id: str) -> dict | None:
|
||||
"""Load a single node's snapshot data on demand."""
|
||||
row = self.conn.execute(
|
||||
"SELECT snapshot_data FROM history_snapshots WHERE data_file_id = ? AND node_id = ?",
|
||||
(data_file_id, node_id),
|
||||
).fetchone()
|
||||
return json.loads(row["snapshot_data"]) if row else None
|
||||
|
||||
def delete_node_snapshots(self, data_file_id: int, node_ids: set) -> None:
|
||||
"""Delete snapshots for removed nodes."""
|
||||
if not node_ids:
|
||||
return
|
||||
placeholders = ",".join("?" for _ in node_ids)
|
||||
self.conn.execute(
|
||||
f"DELETE FROM history_snapshots WHERE data_file_id = ? AND node_id IN ({placeholders})",
|
||||
(data_file_id, *node_ids),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Import
|
||||
# ------------------------------------------------------------------
|
||||
@@ -385,15 +437,30 @@ class ProjectDB:
|
||||
(df_id, seq_num, json.dumps(item), now),
|
||||
)
|
||||
|
||||
# Import history tree
|
||||
# Import history tree (extract snapshots into separate table)
|
||||
history_tree = data.get(KEY_HISTORY_TREE)
|
||||
if history_tree and isinstance(history_tree, dict):
|
||||
now = time.time()
|
||||
nodes = history_tree.get("nodes", {})
|
||||
slim_tree = dict(history_tree)
|
||||
slim_nodes = {}
|
||||
for nid, node in nodes.items():
|
||||
snap = node.get("data")
|
||||
if snap:
|
||||
self.conn.execute(
|
||||
"INSERT INTO history_snapshots (data_file_id, node_id, snapshot_data, updated_at) "
|
||||
"VALUES (?, ?, ?, ?) "
|
||||
"ON CONFLICT(data_file_id, node_id) DO UPDATE SET "
|
||||
"snapshot_data=excluded.snapshot_data, updated_at=excluded.updated_at",
|
||||
(df_id, nid, json.dumps(snap), now),
|
||||
)
|
||||
slim_nodes[nid] = {k: v for k, v in node.items() if k != "data"}
|
||||
slim_tree["nodes"] = slim_nodes
|
||||
self.conn.execute(
|
||||
"INSERT INTO history_trees (data_file_id, tree_data, updated_at) "
|
||||
"VALUES (?, ?, ?) "
|
||||
"ON CONFLICT(data_file_id) DO UPDATE SET tree_data=excluded.tree_data, updated_at=excluded.updated_at",
|
||||
(df_id, json.dumps(history_tree), now),
|
||||
(df_id, json.dumps(slim_tree), now),
|
||||
)
|
||||
|
||||
self.conn.execute("COMMIT")
|
||||
@@ -445,9 +512,12 @@ class ProjectDB:
|
||||
data["batch_data"] = batch_data
|
||||
t2 = time.time()
|
||||
|
||||
# Load history tree
|
||||
# Load history tree (metadata only, no snapshot data)
|
||||
tree = self.get_history_tree(df["id"])
|
||||
if tree:
|
||||
# Strip any residual snapshot data from nodes
|
||||
for node in tree.get("nodes", {}).values():
|
||||
node.pop("data", None)
|
||||
data["history_tree"] = tree
|
||||
t3 = time.time()
|
||||
|
||||
|
||||
@@ -76,6 +76,11 @@ class HistoryTree:
|
||||
return self.nodes[node_id]["data"]
|
||||
return None
|
||||
|
||||
def strip_snapshots(self) -> None:
|
||||
"""Remove snapshot data from all nodes to free memory."""
|
||||
for node in self.nodes.values():
|
||||
node.pop("data", None)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {"nodes": self.nodes, "branches": self.branches, "head_id": self.head_id}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ from utils import (
|
||||
load_config, save_config, load_snippets, save_snippets,
|
||||
load_json, save_json, generate_templates, DEFAULTS,
|
||||
KEY_BATCH_DATA, KEY_SEQUENCE_NUMBER,
|
||||
resolve_path_case_insensitive,
|
||||
resolve_path_case_insensitive, sync_to_db,
|
||||
)
|
||||
from tab_batch_ng import render_batch_processor
|
||||
from tab_timeline_ng import render_timeline_tab
|
||||
@@ -290,11 +290,19 @@ def index():
|
||||
pane_state.db.load_full_data, pane_state.current_project, file_stem)
|
||||
if data is None:
|
||||
data, _ = await asyncio.to_thread(load_json, fp)
|
||||
if pane_state.db and pane_state.db_enabled and pane_state.current_project:
|
||||
await asyncio.to_thread(
|
||||
sync_to_db, pane_state.db, pane_state.current_project, fp, data)
|
||||
tree = data.get('history_tree')
|
||||
if tree and isinstance(tree, dict):
|
||||
for node in tree.get('nodes', {}).values():
|
||||
node.pop('data', None)
|
||||
pane_state.data_cache = data
|
||||
pane_state.last_mtime = fp.stat().st_mtime if fp.exists() else 0
|
||||
pane_state.loaded_file = str(fp)
|
||||
pane_state.file_path = fp
|
||||
pane_state.restored_indicator = None
|
||||
pane_state._src_cache = {'data': None, 'batch': [], 'name': None}
|
||||
_render_batch_tab_content.refresh()
|
||||
logger.info("on_select END (%.3fs)", _time.perf_counter() - _t0)
|
||||
|
||||
@@ -320,11 +328,21 @@ def index():
|
||||
state.db.load_full_data, state.current_project, file_stem)
|
||||
if data is None:
|
||||
data, _ = await asyncio.to_thread(load_json, fp)
|
||||
# When loading from JSON fallback and DB is enabled, sync to DB
|
||||
# so snapshots are persisted, then strip from memory
|
||||
if state.db and state.db_enabled and state.current_project:
|
||||
await asyncio.to_thread(
|
||||
sync_to_db, state.db, state.current_project, fp, data)
|
||||
tree = data.get('history_tree')
|
||||
if tree and isinstance(tree, dict):
|
||||
for node in tree.get('nodes', {}).values():
|
||||
node.pop('data', None)
|
||||
state.data_cache = data
|
||||
state.last_mtime = fp.stat().st_mtime if fp.exists() else 0
|
||||
state.loaded_file = str(fp)
|
||||
state.file_path = fp
|
||||
state.restored_indicator = None
|
||||
state._src_cache = {'data': None, 'batch': [], 'name': None}
|
||||
if state._main_rendered:
|
||||
render_main_content.refresh()
|
||||
logger.info("load_file END (%.3fs)", _time.perf_counter() - _t0)
|
||||
|
||||
@@ -28,6 +28,7 @@ class AppState:
|
||||
_main_rendered: bool = False
|
||||
_live_checkboxes: dict = field(default_factory=dict)
|
||||
_live_refreshables: dict = field(default_factory=dict)
|
||||
_src_cache: dict = field(default_factory=lambda: {'data': None, 'batch': [], 'name': None})
|
||||
|
||||
def create_secondary(self) -> 'AppState':
|
||||
return AppState(
|
||||
|
||||
+15
-9
@@ -247,8 +247,8 @@ def render_batch_processor(state: AppState):
|
||||
|
||||
src_seq_select = ui.select([], label='Source Sequence:').classes('w-64')
|
||||
|
||||
# Track loaded source data
|
||||
_src_cache = {'data': None, 'batch': [], 'name': None}
|
||||
# Track loaded source data (on state so it's cleared on file switch)
|
||||
_src_cache = state._src_cache
|
||||
|
||||
def _update_src():
|
||||
name = src_file_select.value
|
||||
@@ -359,11 +359,14 @@ def render_batch_processor(state: AppState):
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
tree_data = data.get(KEY_HISTORY_TREE, {})
|
||||
htree = HistoryTree(tree_data)
|
||||
t1 = time.perf_counter()
|
||||
snapshot_payload = {k: copy.deepcopy(v) for k, v in data.items()
|
||||
if k != KEY_HISTORY_TREE}
|
||||
logger.info("save_and_snap deepcopy %.3fs", time.perf_counter() - t1)
|
||||
note = commit_input.value if commit_input.value else _auto_change_note(htree, batch_list)
|
||||
# Single serialization: json roundtrip gives us an isolated snapshot
|
||||
# without the expensive deepcopy
|
||||
t1 = time.perf_counter()
|
||||
snapshot_json = json.dumps({k: v for k, v in data.items()
|
||||
if k != KEY_HISTORY_TREE})
|
||||
snapshot_payload = json.loads(snapshot_json)
|
||||
logger.info("save_and_snap snapshot %.3fs", time.perf_counter() - t1)
|
||||
try:
|
||||
htree.commit(snapshot_payload, note=note)
|
||||
except ValueError as e:
|
||||
@@ -371,13 +374,16 @@ def render_batch_processor(state: AppState):
|
||||
return
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
t1 = time.perf_counter()
|
||||
snapshot = json.loads(json.dumps(data))
|
||||
await asyncio.to_thread(save_json, file_path, snapshot)
|
||||
save_snapshot = json.loads(json.dumps(data))
|
||||
await asyncio.to_thread(save_json, file_path, save_snapshot)
|
||||
logger.info("save_and_snap save_json %.3fs", time.perf_counter() - t1)
|
||||
if state.db_enabled and state.current_project and state.db:
|
||||
t1 = time.perf_counter()
|
||||
await asyncio.to_thread(sync_to_db, state.db, state.current_project, file_path, snapshot)
|
||||
await asyncio.to_thread(sync_to_db, state.db, state.current_project, file_path, save_snapshot)
|
||||
logger.info("save_and_snap sync_to_db %.3fs", time.perf_counter() - t1)
|
||||
# Free snapshot data from memory — it's persisted in DB now
|
||||
htree.strip_snapshots()
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
state.restored_indicator = None
|
||||
commit_input.set_value('')
|
||||
logger.info("save_and_snap END (%.3fs)", time.perf_counter() - t_ss)
|
||||
|
||||
+33
-7
@@ -13,7 +13,7 @@ from utils import save_json, sync_to_db, KEY_BATCH_DATA, KEY_HISTORY_TREE
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _delete_nodes(htree, data, file_path, node_ids):
|
||||
def _delete_nodes(htree, data, file_path, node_ids, state=None):
|
||||
"""Delete nodes with backup, branch cleanup, re-parenting, and head fallback."""
|
||||
if 'history_tree_backup' not in data:
|
||||
data['history_tree_backup'] = []
|
||||
@@ -52,6 +52,11 @@ def _delete_nodes(htree, data, file_path, node_ids):
|
||||
else:
|
||||
htree.head_id = None
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
# Clean up DB snapshots for deleted nodes
|
||||
if state and state.db_enabled and state.db and state.current_project:
|
||||
df = state.db.get_data_file_by_names(state.current_project, file_path.stem)
|
||||
if df:
|
||||
state.db.delete_node_snapshots(df['id'], set(node_ids))
|
||||
|
||||
|
||||
def _render_selection_picker(all_nodes, htree, state, refresh_fn):
|
||||
@@ -159,7 +164,7 @@ def _render_batch_delete(htree, data, file_path, state, refresh_fn):
|
||||
|
||||
async def do_batch_delete():
|
||||
current_valid = state.timeline_selected_nodes & set(htree.nodes.keys())
|
||||
_delete_nodes(htree, data, file_path, current_valid)
|
||||
_delete_nodes(htree, data, file_path, current_valid, state=state)
|
||||
snapshot = json.loads(json.dumps(data))
|
||||
await asyncio.to_thread(save_json, file_path, snapshot)
|
||||
if state.db_enabled and state.current_project and state.db:
|
||||
@@ -344,7 +349,7 @@ def _render_node_manager(all_nodes, htree, data, file_path, restore_fn, refresh_
|
||||
|
||||
async def delete_selected():
|
||||
if sel_id in htree.nodes:
|
||||
_delete_nodes(htree, data, file_path, {sel_id})
|
||||
_delete_nodes(htree, data, file_path, {sel_id}, state=state)
|
||||
snapshot = json.loads(json.dumps(data))
|
||||
await asyncio.to_thread(save_json, file_path, snapshot)
|
||||
if state and state.db_enabled and state.current_project and state.db:
|
||||
@@ -361,7 +366,7 @@ def _render_node_manager(all_nodes, htree, data, file_path, restore_fn, refresh_
|
||||
|
||||
# Data preview
|
||||
with ui.expansion('Data Preview', icon='preview').classes('w-full q-mt-sm'):
|
||||
_render_data_preview(sel_id, htree)
|
||||
_render_data_preview(sel_id, htree, state=state, file_path=file_path)
|
||||
|
||||
render_branch_nodes()
|
||||
|
||||
@@ -566,7 +571,20 @@ async def _restore_node(data, node, htree, file_path, state: AppState):
|
||||
"""Restore a history node as the current version (full replace, not merge)."""
|
||||
t0 = time.perf_counter()
|
||||
logger.info("_restore_node START: %s", node.get('note', 'Step'))
|
||||
node_data = json.loads(json.dumps(node.get('data', {})))
|
||||
# Load snapshot from DB on demand (nodes no longer hold data in memory)
|
||||
raw_snap = node.get('data')
|
||||
if not raw_snap and state.db_enabled and state.db and state.current_project:
|
||||
df = state.db.get_data_file_by_names(state.current_project, file_path.stem)
|
||||
if df:
|
||||
raw_snap = await asyncio.to_thread(
|
||||
state.db.get_node_snapshot, df['id'], node['id'])
|
||||
if not raw_snap:
|
||||
# Last resort: read from JSON file on disk
|
||||
from utils import load_json as _load_json
|
||||
raw_file, _ = await asyncio.to_thread(_load_json, file_path)
|
||||
tree_on_disk = raw_file.get(KEY_HISTORY_TREE, {})
|
||||
raw_snap = tree_on_disk.get('nodes', {}).get(node['id'], {}).get('data', {})
|
||||
node_data = json.loads(json.dumps(raw_snap)) if raw_snap else {}
|
||||
# Preserve the history tree before clearing
|
||||
preserved_tree = data.get(KEY_HISTORY_TREE)
|
||||
preserved_backup = data.get('history_tree_backup')
|
||||
@@ -589,13 +607,21 @@ async def _restore_node(data, node, htree, file_path, state: AppState):
|
||||
ui.notify('Restored!', type='positive')
|
||||
|
||||
|
||||
def _render_data_preview(nid, htree):
|
||||
def _render_data_preview(nid, htree, state: AppState = None, file_path=None):
|
||||
"""Render a read-only preview of the selected node's data."""
|
||||
if not nid or nid not in htree.nodes:
|
||||
ui.label('No node selected.').classes('text-caption')
|
||||
return
|
||||
|
||||
node_data = htree.nodes[nid].get('data', {})
|
||||
# Load snapshot from DB on demand (not stored in memory)
|
||||
node_data = htree.nodes[nid].get('data')
|
||||
if not node_data and state and state.db_enabled and state.db and state.current_project and file_path:
|
||||
df = state.db.get_data_file_by_names(state.current_project, file_path.stem)
|
||||
if df:
|
||||
node_data = state.db.get_node_snapshot(df['id'], nid)
|
||||
if not node_data:
|
||||
ui.label('Snapshot data not available.').classes('text-caption text-warning')
|
||||
return
|
||||
batch_list = node_data.get(KEY_BATCH_DATA, [])
|
||||
|
||||
if batch_list and isinstance(batch_list, list) and len(batch_list) > 0:
|
||||
|
||||
@@ -210,6 +210,15 @@ def save_json(path: str | Path, data: dict[str, Any]) -> None:
|
||||
os.replace(tmp, path)
|
||||
logger.info("save_json %s: %.3fs", path.name, time.time() - t0)
|
||||
|
||||
|
||||
def snapshot_data(data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Create a thread-safe deep copy via JSON roundtrip.
|
||||
|
||||
Must be called on the main thread before passing data to asyncio.to_thread,
|
||||
to avoid 'dict changed size during iteration' when the UI mutates data.
|
||||
"""
|
||||
return json.loads(json.dumps(data))
|
||||
|
||||
def get_file_mtime(path: str | Path) -> float:
|
||||
"""Returns the modification time of a file, or 0 if it doesn't exist."""
|
||||
path = Path(path)
|
||||
@@ -279,14 +288,29 @@ def sync_to_db(db, project_name: str, file_path: Path, data: dict) -> None:
|
||||
else:
|
||||
db.conn.execute("DELETE FROM sequences WHERE data_file_id = ?", (df_id,))
|
||||
|
||||
# Sync history tree
|
||||
# Sync history tree (extract node snapshots into separate table)
|
||||
history_tree = data.get(KEY_HISTORY_TREE)
|
||||
if history_tree and isinstance(history_tree, dict):
|
||||
nodes = history_tree.get("nodes", {})
|
||||
slim_tree = dict(history_tree)
|
||||
slim_nodes = {}
|
||||
for nid, node in nodes.items():
|
||||
snap = node.get("data")
|
||||
if snap:
|
||||
db.conn.execute(
|
||||
"INSERT INTO history_snapshots (data_file_id, node_id, snapshot_data, updated_at) "
|
||||
"VALUES (?, ?, ?, ?) "
|
||||
"ON CONFLICT(data_file_id, node_id) DO UPDATE SET "
|
||||
"snapshot_data=excluded.snapshot_data, updated_at=excluded.updated_at",
|
||||
(df_id, nid, json.dumps(snap), now),
|
||||
)
|
||||
slim_nodes[nid] = {k: v for k, v in node.items() if k != "data"}
|
||||
slim_tree["nodes"] = slim_nodes
|
||||
db.conn.execute(
|
||||
"INSERT INTO history_trees (data_file_id, tree_data, updated_at) "
|
||||
"VALUES (?, ?, ?) "
|
||||
"ON CONFLICT(data_file_id) DO UPDATE SET tree_data=excluded.tree_data, updated_at=excluded.updated_at",
|
||||
(df_id, json.dumps(history_tree), now),
|
||||
(df_id, json.dumps(slim_tree), now),
|
||||
)
|
||||
|
||||
db.conn.execute("COMMIT")
|
||||
|
||||
Reference in New Issue
Block a user