Add accumulator retake workflow restore

This commit is contained in:
2026-06-28 10:27:05 +02:00
parent f681fe2949
commit d937c219ee
5 changed files with 158 additions and 22 deletions
+68 -20
View File
@@ -345,6 +345,71 @@ def accumulator_list_entries(store_key: str, preview_limit: int = 0) -> dict[str
return result
def _find_accumulator_entry(
store: list[dict[str, Any]],
preview_key: str = "",
entry_id: str = "",
index: int = 0,
) -> tuple[int, dict[str, Any]]:
preview_key = str(preview_key or "").strip()
entry_id = str(entry_id or "").strip()
if preview_key:
for current_index, entry in enumerate(store):
if _entry_preview_key(entry) == preview_key:
return current_index, entry
elif entry_id:
for current_index, entry in enumerate(store):
if str(entry.get("id") or "") == entry_id:
return current_index, entry
elif int(index) > 0:
zero_index = int(index) - 1
if 0 <= zero_index < len(store):
return zero_index, store[zero_index]
else:
raise ValueError("entry_id or 1-based index is required")
raise ValueError("accumulator entry not found")
def _entry_workflow(entry: dict[str, Any]) -> Any:
extra_pnginfo = entry.get("extra_pnginfo")
if isinstance(extra_pnginfo, dict):
workflow = extra_pnginfo.get("workflow") or extra_pnginfo.get("Workflow")
if workflow:
return _metadata_copy(workflow)
prompt = entry.get("prompt")
if isinstance(prompt, dict):
workflow = prompt.get("workflow")
if workflow:
return _metadata_copy(workflow)
return None
def accumulator_retake_entry(
store_key: str,
preview_key: str = "",
entry_id: str = "",
index: int = 0,
) -> dict[str, Any]:
key = str(store_key or "").strip()
if not key:
raise ValueError("store_key is required for accumulator retake")
store = _ACCUMULATOR_STORES.setdefault(key, [])
zero_index, entry = _find_accumulator_entry(store, preview_key=preview_key, entry_id=entry_id, index=index)
workflow = _entry_workflow(entry)
if workflow is None:
raise ValueError("selected accumulator entry does not include workflow metadata")
entry_info = _entry_infos([entry])[0]
entry_info["index"] = zero_index + 1
return {
"store_key": key,
"entry": entry_info,
"index": zero_index + 1,
"workflow": workflow,
"prompt": _metadata_copy(entry.get("prompt")),
"extra_pnginfo": _metadata_copy(entry.get("extra_pnginfo")) if isinstance(entry.get("extra_pnginfo"), dict) else {},
}
def accumulator_delete_entries(
store_key: str,
preview_key: str = "",
@@ -426,26 +491,9 @@ def accumulator_move_entry(
result = accumulator_list_entries(key, preview_limit=preview_limit)
result["moved"] = False
return result
zero_index = -1
preview_key = str(preview_key or "").strip()
entry_id = str(entry_id or "").strip()
if preview_key:
for current_index, entry in enumerate(store):
if _entry_preview_key(entry) == preview_key:
zero_index = current_index
break
elif entry_id:
for current_index, entry in enumerate(store):
if str(entry.get("id") or "") == entry_id:
zero_index = current_index
break
elif int(index) > 0:
candidate = int(index) - 1
if candidate < len(store):
zero_index = candidate
else:
raise ValueError("entry_id or 1-based index is required")
if zero_index < 0:
try:
zero_index, _entry = _find_accumulator_entry(store, preview_key=preview_key, entry_id=entry_id, index=index)
except ValueError:
result = accumulator_list_entries(key, preview_limit=preview_limit)
result["moved"] = False
return result