feat: pool manifest read/write with atomic save
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,49 @@
|
||||
"""Pure storage layer for the Image Pool node. Stdlib only — no torch, no comfy."""
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def empty_manifest():
|
||||
return {"active": 0, "slots": [], "next_seq": 1}
|
||||
|
||||
|
||||
def pool_dir(base_dir, pool_id):
|
||||
return Path(base_dir) / pool_id
|
||||
|
||||
|
||||
def manifest_path(base_dir, pool_id):
|
||||
return pool_dir(base_dir, pool_id) / "manifest.json"
|
||||
|
||||
|
||||
def read_manifest(base_dir, pool_id):
|
||||
p = manifest_path(base_dir, pool_id)
|
||||
if not p.exists():
|
||||
return empty_manifest()
|
||||
try:
|
||||
with open(p, "r", encoding="utf-8") as f:
|
||||
m = json.load(f)
|
||||
# minimal shape guard
|
||||
if not isinstance(m, dict) or "slots" not in m:
|
||||
raise ValueError("bad manifest")
|
||||
m.setdefault("active", 0)
|
||||
m.setdefault("next_seq", len(m.get("slots", [])) + 1)
|
||||
return m
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
return rebuild_manifest(base_dir, pool_id)
|
||||
|
||||
|
||||
def write_manifest(base_dir, pool_id, manifest):
|
||||
d = pool_dir(base_dir, pool_id)
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
final = d / "manifest.json"
|
||||
tmp = d / "manifest.json.tmp"
|
||||
with open(tmp, "w", encoding="utf-8") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
os.replace(tmp, final) # atomic on same filesystem
|
||||
return manifest
|
||||
|
||||
|
||||
def rebuild_manifest(base_dir, pool_id):
|
||||
# Temporary stub — replaced in Task 7.
|
||||
return empty_manifest()
|
||||
@@ -0,0 +1,28 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
from gates import pool
|
||||
|
||||
|
||||
def test_empty_manifest_shape():
|
||||
m = pool.empty_manifest()
|
||||
assert m == {"active": 0, "slots": [], "next_seq": 1}
|
||||
|
||||
|
||||
def test_read_missing_creates_empty(tmp_path):
|
||||
m = pool.read_manifest(str(tmp_path), "p1")
|
||||
assert m == pool.empty_manifest()
|
||||
|
||||
|
||||
def test_write_then_read_roundtrip(tmp_path):
|
||||
m = pool.empty_manifest()
|
||||
m["active"] = 2
|
||||
pool.write_manifest(str(tmp_path), "p1", m)
|
||||
# file lives at <base>/p1/manifest.json
|
||||
assert (tmp_path / "p1" / "manifest.json").exists()
|
||||
assert pool.read_manifest(str(tmp_path), "p1") == m
|
||||
|
||||
|
||||
def test_write_is_atomic_no_partial_temp_left(tmp_path):
|
||||
pool.write_manifest(str(tmp_path), "p1", pool.empty_manifest())
|
||||
leftovers = list((tmp_path / "p1").glob("*.tmp"))
|
||||
assert leftovers == []
|
||||
Reference in New Issue
Block a user