1450 lines
53 KiB
Python
1450 lines
53 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
from typing import Any
|
|
|
|
try:
|
|
from comfy_execution.graph import ExecutionBlocker
|
|
from comfy_execution.graph_utils import GraphBuilder, is_link
|
|
except Exception: # Allows local syntax/import checks outside ComfyUI.
|
|
ExecutionBlocker = None
|
|
GraphBuilder = None
|
|
|
|
def is_link(value: Any) -> bool:
|
|
return isinstance(value, list) and len(value) == 2
|
|
|
|
try:
|
|
from nodes import NODE_CLASS_MAPPINGS as ALL_NODE_CLASS_MAPPINGS
|
|
except Exception:
|
|
ALL_NODE_CLASS_MAPPINGS = {}
|
|
|
|
try:
|
|
import folder_paths
|
|
import numpy as np
|
|
from PIL import Image
|
|
from PIL.PngImagePlugin import PngInfo
|
|
from comfy.cli_args import args
|
|
except Exception:
|
|
folder_paths = None
|
|
np = None
|
|
Image = None
|
|
PngInfo = None
|
|
|
|
class _ArgsFallback:
|
|
disable_metadata = True
|
|
|
|
args = _ArgsFallback()
|
|
|
|
|
|
MAX_LOOP_VALUES = 20
|
|
MAX_CARRY_VALUES = MAX_LOOP_VALUES - 2
|
|
MAX_SWITCH_INPUTS = 64
|
|
COLLECTION_MODES = ["auto_batch", "list", "image_batch", "latent_batch", "string_lines"]
|
|
ACCUMULATOR_ACTIONS = ["append_variant", "replace_by_entry_id", "append", "clear_then_append", "clear", "read"]
|
|
ACCUMULATOR_IMAGE_BATCH_MODES = ["same_size_only", "resize_to_first"]
|
|
ACCUMULATOR_IMAGE_GROUPS = 4
|
|
ACCUMULATOR_PREVIEW_VIEW_MODES = ["grid", "carousel"]
|
|
ACCUMULATOR_PREVIEW_DELETE_ACTIONS = ["none", "delete_entry_id", "delete_index", "clear"]
|
|
INDEX_SWITCH_MODES = ["pick_input", "route_output"]
|
|
INDEX_SWITCH_BASES = ["one_based", "zero_based"]
|
|
INDEX_SWITCH_MISSING_BEHAVIORS = ["fallback", "none", "clamp", "wrap"]
|
|
PREVIEW_TEXT_FORMATS = ["auto", "json", "repr", "str"]
|
|
|
|
_ACCUMULATOR_STORES: dict[str, list[dict[str, Any]]] = {}
|
|
|
|
|
|
def _entry_preview_key(entry: dict[str, Any]) -> str:
|
|
key = str(entry.get("_sxcp_preview_key") or "").strip()
|
|
if not key:
|
|
key = f"entry_{random.getrandbits(64):016x}"
|
|
entry["_sxcp_preview_key"] = key
|
|
return key
|
|
|
|
|
|
class AnyType(str):
|
|
def __ne__(self, _other: object) -> bool:
|
|
return False
|
|
|
|
|
|
ANY_TYPE = AnyType("*")
|
|
|
|
|
|
def _require_graph_builder() -> None:
|
|
if GraphBuilder is None:
|
|
raise RuntimeError("SxCP loop nodes require ComfyUI's comfy_execution GraphBuilder.")
|
|
|
|
|
|
def _execution_blocker() -> Any:
|
|
return ExecutionBlocker(None) if ExecutionBlocker is not None else None
|
|
|
|
|
|
def _torch_cat(first: Any, second: Any) -> Any | None:
|
|
try:
|
|
import torch
|
|
except Exception:
|
|
return None
|
|
if torch.is_tensor(first) and torch.is_tensor(second):
|
|
return torch.cat((first, second), dim=0)
|
|
return None
|
|
|
|
|
|
def _latent_cat(first: Any, second: Any) -> Any | None:
|
|
if not isinstance(first, dict) or not isinstance(second, dict):
|
|
return None
|
|
if "samples" not in first or "samples" not in second:
|
|
return None
|
|
samples = _torch_cat(first["samples"], second["samples"])
|
|
if samples is None:
|
|
return None
|
|
merged = dict(second)
|
|
merged["samples"] = samples
|
|
return merged
|
|
|
|
|
|
def _torch_cat_many(values: list[Any]) -> Any | None:
|
|
if not values:
|
|
return None
|
|
result = values[0]
|
|
for value in values[1:]:
|
|
result = _torch_cat(result, value)
|
|
if result is None:
|
|
return None
|
|
return result
|
|
|
|
|
|
def _is_image_tensor(value: Any) -> bool:
|
|
try:
|
|
import torch
|
|
except Exception:
|
|
return False
|
|
return torch.is_tensor(value) and len(value.shape) == 4
|
|
|
|
|
|
def _image_shape(value: Any) -> tuple[int, ...] | None:
|
|
if not _is_image_tensor(value):
|
|
return None
|
|
return tuple(int(part) for part in value.shape[1:])
|
|
|
|
|
|
def _split_image_value(value: Any) -> list[Any]:
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, (list, tuple)):
|
|
images: list[Any] = []
|
|
for item in value:
|
|
images.extend(_split_image_value(item))
|
|
return images
|
|
if not _is_image_tensor(value):
|
|
return []
|
|
if int(value.shape[0]) <= 1:
|
|
return [value]
|
|
return [value[index : index + 1] for index in range(int(value.shape[0]))]
|
|
|
|
|
|
def _resize_image_to_shape(image: Any, shape: tuple[int, ...]) -> Any | None:
|
|
if not _is_image_tensor(image):
|
|
return None
|
|
try:
|
|
import comfy.utils
|
|
except Exception:
|
|
return None
|
|
height, width = int(shape[0]), int(shape[1])
|
|
return comfy.utils.common_upscale(image.movedim(-1, 1), width, height, "lanczos", "center").movedim(1, -1)
|
|
|
|
|
|
def _image_batch_from_images(images: list[Any], mode: str = "same_size_only") -> Any | None:
|
|
if not images:
|
|
return None
|
|
mode = mode if mode in ACCUMULATOR_IMAGE_BATCH_MODES else "same_size_only"
|
|
first_shape = _image_shape(images[0])
|
|
if first_shape is None:
|
|
return None
|
|
normalized = []
|
|
for image in images:
|
|
if _image_shape(image) != first_shape:
|
|
if mode != "resize_to_first":
|
|
return None
|
|
image = _resize_image_to_shape(image, first_shape)
|
|
if image is None:
|
|
return None
|
|
normalized.append(image)
|
|
return _torch_cat_many(normalized)
|
|
|
|
|
|
def _group_image_batches(images: list[Any]) -> list[Any]:
|
|
grouped: dict[tuple[int, ...], list[Any]] = {}
|
|
order: list[tuple[int, ...]] = []
|
|
for image in images:
|
|
shape = _image_shape(image)
|
|
if shape is None:
|
|
continue
|
|
if shape not in grouped:
|
|
grouped[shape] = []
|
|
order.append(shape)
|
|
grouped[shape].append(image)
|
|
batches = [_torch_cat_many(grouped[shape]) for shape in order]
|
|
return [batch for batch in batches if batch is not None]
|
|
|
|
|
|
def _accumulator_store_key(store_key: str, unique_id: Any = None) -> str:
|
|
key = str(store_key or "").strip()
|
|
if key:
|
|
return key
|
|
return f"node:{unique_id}"
|
|
|
|
|
|
def _entry_value_summary(value: Any) -> str:
|
|
if value is None:
|
|
return ""
|
|
text = str(value)
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text[:160]
|
|
|
|
|
|
def _metadata_copy(value: Any) -> Any:
|
|
try:
|
|
return json.loads(json.dumps(value))
|
|
except Exception:
|
|
return value
|
|
|
|
|
|
def _entry_metadata(prompt: Any, extra_pnginfo: Any) -> dict[str, Any]:
|
|
return {
|
|
"prompt": _metadata_copy(prompt),
|
|
"extra_pnginfo": _metadata_copy(extra_pnginfo) if isinstance(extra_pnginfo, dict) else {},
|
|
}
|
|
|
|
|
|
def _entry_infos(store: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
entries = []
|
|
for index, entry in enumerate(store, start=1):
|
|
image = entry.get("image")
|
|
shape = _image_shape(image)
|
|
has_metadata = "prompt" in entry or "extra_pnginfo" in entry
|
|
entries.append(
|
|
{
|
|
"index": index,
|
|
"id": str(entry.get("id") or ""),
|
|
"preview_key": _entry_preview_key(entry),
|
|
"has_image": image is not None,
|
|
"has_metadata": has_metadata,
|
|
"shape": list(shape) if shape is not None else [],
|
|
"value": _entry_value_summary(entry.get("value")),
|
|
}
|
|
)
|
|
return entries
|
|
|
|
|
|
def _attach_preview_images(entries: list[dict[str, Any]], images: list[dict[str, str]]) -> None:
|
|
by_key = {
|
|
str(image.get("preview_key") or ""): image
|
|
for image in images
|
|
if str(image.get("preview_key") or "")
|
|
}
|
|
for entry in entries:
|
|
image = by_key.get(str(entry.get("preview_key") or ""))
|
|
if image:
|
|
entry["preview_image"] = image
|
|
|
|
|
|
def _jsonable_preview_value(value: Any, depth: int = 4, max_items: int = 80) -> Any:
|
|
if depth < 0:
|
|
return "..."
|
|
if value is None or isinstance(value, (str, int, float, bool)):
|
|
return value
|
|
if isinstance(value, dict):
|
|
items = list(value.items())
|
|
output = {
|
|
str(key): _jsonable_preview_value(item, depth - 1, max_items)
|
|
for key, item in items[:max_items]
|
|
}
|
|
if len(items) > max_items:
|
|
output["..."] = f"{len(items) - max_items} more"
|
|
return output
|
|
if isinstance(value, (list, tuple, set)):
|
|
items = list(value)
|
|
output = [_jsonable_preview_value(item, depth - 1, max_items) for item in items[:max_items]]
|
|
if len(items) > max_items:
|
|
output.append(f"... {len(items) - max_items} more")
|
|
return output
|
|
shape = getattr(value, "shape", None)
|
|
if shape is not None:
|
|
try:
|
|
return {
|
|
"type": type(value).__name__,
|
|
"shape": [int(part) for part in shape],
|
|
"dtype": str(getattr(value, "dtype", "")),
|
|
"device": str(getattr(value, "device", "")),
|
|
}
|
|
except Exception:
|
|
pass
|
|
return str(value)
|
|
|
|
|
|
def _truncate_preview_text(text: str, max_chars: int) -> str:
|
|
max_chars = max(0, int(max_chars or 0))
|
|
if max_chars <= 0 or len(text) <= max_chars:
|
|
return text
|
|
omitted = len(text) - max_chars
|
|
return f"{text[:max_chars]}\n... truncated {omitted} characters"
|
|
|
|
|
|
def _any_to_preview_text(value: Any, preview_format: str, max_chars: int) -> str:
|
|
preview_format = preview_format if preview_format in PREVIEW_TEXT_FORMATS else "auto"
|
|
if preview_format == "str":
|
|
text = str(value)
|
|
elif preview_format == "repr":
|
|
text = repr(value)
|
|
elif preview_format == "json":
|
|
text = json.dumps(_jsonable_preview_value(value), ensure_ascii=True, indent=2, sort_keys=True)
|
|
elif isinstance(value, str):
|
|
text = value
|
|
else:
|
|
try:
|
|
text = json.dumps(_jsonable_preview_value(value), ensure_ascii=True, indent=2, sort_keys=True)
|
|
except Exception:
|
|
text = str(value)
|
|
return _truncate_preview_text(text, max_chars)
|
|
|
|
|
|
def _accumulator_status(key: str, store: list[dict[str, Any]]) -> str:
|
|
images = [entry.get("image") for entry in store if entry.get("image") is not None]
|
|
shapes = []
|
|
for image in images:
|
|
shape = _image_shape(image)
|
|
if shape is not None and shape not in shapes:
|
|
shapes.append(shape)
|
|
shape_text = ", ".join(f"{shape[1]}x{shape[0]}" for shape in shapes) or "no images"
|
|
return f"key={key}; entries={len(store)}; image_entries={len(images)}; formats={shape_text}"
|
|
|
|
|
|
def accumulator_list_entries(store_key: str, preview_limit: int = 0) -> dict[str, Any]:
|
|
key = str(store_key or "").strip()
|
|
if not key:
|
|
raise ValueError("store_key is required for accumulator preview actions")
|
|
store = _ACCUMULATOR_STORES.setdefault(key, [])
|
|
entries = _entry_infos(store)
|
|
result = {
|
|
"store_key": key,
|
|
"entries": entries,
|
|
"count": len(store),
|
|
"status": _accumulator_status(key, store),
|
|
}
|
|
if int(preview_limit) > 0:
|
|
images = _preview_image_results(store, preview_limit, None, None)
|
|
_attach_preview_images(entries, images)
|
|
result["images"] = images
|
|
return result
|
|
|
|
|
|
def accumulator_delete_entries(
|
|
store_key: str,
|
|
preview_key: str = "",
|
|
entry_id: str = "",
|
|
index: int = 0,
|
|
clear: bool = False,
|
|
preview_limit: int = 0,
|
|
) -> dict[str, Any]:
|
|
key = str(store_key or "").strip()
|
|
if not key:
|
|
raise ValueError("store_key is required for accumulator preview actions")
|
|
store = _ACCUMULATOR_STORES.setdefault(key, [])
|
|
removed = 0
|
|
if clear:
|
|
removed = len(store)
|
|
store.clear()
|
|
else:
|
|
preview_key = str(preview_key or "").strip()
|
|
entry_id = str(entry_id or "").strip()
|
|
if preview_key:
|
|
before = len(store)
|
|
store[:] = [entry for entry in store if _entry_preview_key(entry) != preview_key]
|
|
removed = before - len(store)
|
|
elif entry_id:
|
|
before = len(store)
|
|
store[:] = [entry for entry in store if str(entry.get("id") or "") != entry_id]
|
|
removed = before - len(store)
|
|
elif int(index) > 0:
|
|
zero_index = int(index) - 1
|
|
if zero_index < len(store):
|
|
del store[zero_index]
|
|
removed = 1
|
|
else:
|
|
raise ValueError("entry_id or 1-based index is required")
|
|
result = accumulator_list_entries(key, preview_limit=preview_limit)
|
|
result["removed"] = removed
|
|
return result
|
|
|
|
|
|
def accumulator_save_entries(
|
|
store_key: str,
|
|
save_path: str = "sxcp_accumulator",
|
|
filename_prefix: str = "sxcp_accum",
|
|
clear_after_save: bool = False,
|
|
preview_limit: int = 0,
|
|
) -> dict[str, Any]:
|
|
key = str(store_key or "").strip()
|
|
if not key:
|
|
raise ValueError("store_key is required for accumulator save")
|
|
store = _ACCUMULATOR_STORES.setdefault(key, [])
|
|
saved_paths = _save_images_to_folder(store, save_path, filename_prefix, None, None)
|
|
if saved_paths and bool(clear_after_save):
|
|
store.clear()
|
|
result = accumulator_list_entries(key, preview_limit=preview_limit)
|
|
result["saved_paths"] = saved_paths
|
|
result["saved"] = len(saved_paths)
|
|
result["cleared_after_save"] = bool(saved_paths and clear_after_save)
|
|
result["status"] = (
|
|
f"{result.get('status', '')}; saved={len(saved_paths)}"
|
|
f"{'; cleared_after_save' if saved_paths and clear_after_save else ''}"
|
|
)
|
|
return result
|
|
|
|
|
|
def accumulator_move_entry(
|
|
store_key: str,
|
|
preview_key: str = "",
|
|
entry_id: str = "",
|
|
index: int = 0,
|
|
direction: str = "up",
|
|
target_index: int = 0,
|
|
preview_limit: int = 0,
|
|
) -> dict[str, Any]:
|
|
key = str(store_key or "").strip()
|
|
if not key:
|
|
raise ValueError("store_key is required for accumulator preview actions")
|
|
store = _ACCUMULATOR_STORES.setdefault(key, [])
|
|
if not store:
|
|
result = accumulator_list_entries(key, preview_limit=preview_limit)
|
|
result["moved"] = False
|
|
return result
|
|
zero_index = -1
|
|
preview_key = str(preview_key or "").strip()
|
|
entry_id = str(entry_id or "").strip()
|
|
if preview_key:
|
|
for current_index, entry in enumerate(store):
|
|
if _entry_preview_key(entry) == preview_key:
|
|
zero_index = current_index
|
|
break
|
|
elif entry_id:
|
|
for current_index, entry in enumerate(store):
|
|
if str(entry.get("id") or "") == entry_id:
|
|
zero_index = current_index
|
|
break
|
|
elif int(index) > 0:
|
|
candidate = int(index) - 1
|
|
if candidate < len(store):
|
|
zero_index = candidate
|
|
else:
|
|
raise ValueError("entry_id or 1-based index is required")
|
|
if zero_index < 0:
|
|
result = accumulator_list_entries(key, preview_limit=preview_limit)
|
|
result["moved"] = False
|
|
return result
|
|
requested_target = int(target_index)
|
|
if requested_target > 0:
|
|
entry = store.pop(zero_index)
|
|
target_zero_index = max(0, min(len(store), requested_target - 1))
|
|
store.insert(target_zero_index, entry)
|
|
moved = target_zero_index != zero_index
|
|
else:
|
|
direction = str(direction or "up").strip().lower()
|
|
if direction == "top":
|
|
target_zero_index = 0
|
|
elif direction == "bottom":
|
|
target_zero_index = len(store) - 1
|
|
elif direction == "down":
|
|
target_zero_index = min(len(store) - 1, zero_index + 1)
|
|
else:
|
|
target_zero_index = max(0, zero_index - 1)
|
|
moved = target_zero_index != zero_index
|
|
if moved:
|
|
entry = store.pop(zero_index)
|
|
store.insert(target_zero_index, entry)
|
|
result = accumulator_list_entries(key, preview_limit=preview_limit)
|
|
result["moved"] = moved
|
|
result["from_index"] = zero_index + 1
|
|
result["to_index"] = target_zero_index + 1
|
|
return result
|
|
|
|
|
|
def _require_image_saving() -> None:
|
|
if folder_paths is None or np is None or Image is None:
|
|
raise RuntimeError("Image preview/save helpers require ComfyUI image dependencies.")
|
|
|
|
|
|
def _metadata(prompt: Any, extra_pnginfo: Any) -> Any:
|
|
if args.disable_metadata or PngInfo is None:
|
|
return None
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if isinstance(extra_pnginfo, dict):
|
|
for key, value in extra_pnginfo.items():
|
|
metadata.add_text(key, json.dumps(value))
|
|
return metadata
|
|
|
|
|
|
def _metadata_for_entry(entry: dict[str, Any], fallback_prompt: Any, fallback_extra_pnginfo: Any) -> Any:
|
|
prompt = entry["prompt"] if "prompt" in entry else fallback_prompt
|
|
extra_pnginfo = entry["extra_pnginfo"] if "extra_pnginfo" in entry else fallback_extra_pnginfo
|
|
return _metadata(prompt, extra_pnginfo)
|
|
|
|
|
|
def _image_to_pil(image: Any) -> Any:
|
|
_require_image_saving()
|
|
tensor = image
|
|
try:
|
|
if len(tensor.shape) == 4:
|
|
tensor = tensor[0]
|
|
except Exception:
|
|
pass
|
|
image_data = 255.0 * tensor.cpu().numpy()
|
|
return Image.fromarray(np.clip(image_data, 0, 255).astype(np.uint8))
|
|
|
|
|
|
def _safe_filename_prefix(prefix: str, default: str = "sxcp_accumulator") -> str:
|
|
text = str(prefix or "").strip() or default
|
|
text = os.path.basename(text)
|
|
text = re.sub(r"[^A-Za-z0-9._-]+", "_", text).strip("._")
|
|
return text or default
|
|
|
|
|
|
def _next_save_counter(folder: str, prefix: str) -> int:
|
|
pattern = re.compile(rf"^{re.escape(prefix)}_(\d+)_\.png$")
|
|
counter = 1
|
|
try:
|
|
for filename in os.listdir(folder):
|
|
match = pattern.match(filename)
|
|
if match:
|
|
counter = max(counter, int(match.group(1)) + 1)
|
|
except FileNotFoundError:
|
|
pass
|
|
return counter
|
|
|
|
|
|
def _preview_image_results(
|
|
entries: list[dict[str, Any]],
|
|
preview_limit: int,
|
|
prompt: Any,
|
|
extra_pnginfo: Any,
|
|
) -> list[dict[str, str]]:
|
|
image_entries = [entry for entry in entries if entry.get("image") is not None]
|
|
if not image_entries:
|
|
return []
|
|
_require_image_saving()
|
|
output_dir = folder_paths.get_temp_directory()
|
|
preview_entries = image_entries[: max(1, int(preview_limit))]
|
|
first_shape = _image_shape(preview_entries[0].get("image"))
|
|
height, width = (first_shape[0], first_shape[1]) if first_shape else (512, 512)
|
|
prefix = "SxCPAccumulatorPreview_temp_" + "".join(random.choice("abcdefghijklmnopqrstupvxyz") for _ in range(5))
|
|
full_output_folder, filename, counter, subfolder, _filename_prefix = folder_paths.get_save_image_path(prefix, output_dir, width, height)
|
|
results = []
|
|
for entry in preview_entries:
|
|
image = entry.get("image")
|
|
metadata = _metadata_for_entry(entry, prompt, extra_pnginfo)
|
|
file = f"{filename}_{counter:05}_.png"
|
|
_image_to_pil(image).save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=1)
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": "temp",
|
|
"preview_key": _entry_preview_key(entry),
|
|
"entry_id": str(entry.get("id") or ""),
|
|
})
|
|
counter += 1
|
|
return results
|
|
|
|
|
|
def _resolve_save_folder(save_path: str) -> str:
|
|
_require_image_saving()
|
|
raw_path = os.path.expanduser(str(save_path or "").strip())
|
|
if not raw_path:
|
|
raw_path = "sxcp_accumulator"
|
|
if os.path.isabs(raw_path):
|
|
return raw_path
|
|
return os.path.join(folder_paths.get_output_directory(), raw_path)
|
|
|
|
|
|
def _save_images_to_folder(
|
|
entries: list[dict[str, Any]],
|
|
save_path: str,
|
|
filename_prefix: str,
|
|
prompt: Any,
|
|
extra_pnginfo: Any,
|
|
) -> list[str]:
|
|
image_entries = [entry for entry in entries if entry.get("image") is not None]
|
|
if not image_entries:
|
|
return []
|
|
folder = _resolve_save_folder(save_path)
|
|
os.makedirs(folder, exist_ok=True)
|
|
prefix = _safe_filename_prefix(filename_prefix)
|
|
counter = _next_save_counter(folder, prefix)
|
|
saved_paths = []
|
|
for entry in image_entries:
|
|
image = entry.get("image")
|
|
metadata = _metadata_for_entry(entry, prompt, extra_pnginfo)
|
|
file = f"{prefix}_{counter:05}_.png"
|
|
path = os.path.join(folder, file)
|
|
_image_to_pil(image).save(path, pnginfo=metadata, compress_level=4)
|
|
saved_paths.append(path)
|
|
counter += 1
|
|
return saved_paths
|
|
|
|
|
|
def _as_list(collection: Any) -> list[Any]:
|
|
if collection is None:
|
|
return []
|
|
return list(collection) if isinstance(collection, list) else [collection]
|
|
|
|
|
|
def append_collected_value(collection: Any, value: Any, mode: str = "auto_batch", skip_none: bool = True) -> Any:
|
|
if value is None and skip_none:
|
|
return collection
|
|
mode = mode if mode in COLLECTION_MODES else "auto_batch"
|
|
if mode == "string_lines":
|
|
value_text = "" if value is None else str(value)
|
|
if not collection:
|
|
return value_text
|
|
return f"{collection}\n{value_text}"
|
|
if mode == "list":
|
|
return _as_list(collection) + [value]
|
|
if collection is None:
|
|
return value
|
|
if mode in ("auto_batch", "image_batch"):
|
|
tensor_batch = _torch_cat(collection, value)
|
|
if tensor_batch is not None:
|
|
return tensor_batch
|
|
if mode == "image_batch":
|
|
return _as_list(collection) + [value]
|
|
if mode in ("auto_batch", "latent_batch"):
|
|
latent_batch = _latent_cat(collection, value)
|
|
if latent_batch is not None:
|
|
return latent_batch
|
|
if mode == "latent_batch":
|
|
return _as_list(collection) + [value]
|
|
return _as_list(collection) + [value]
|
|
|
|
|
|
def _switch_available_indices(kwargs: dict[str, Any]) -> list[int]:
|
|
indices = []
|
|
for key in kwargs:
|
|
match = re.match(r"^input_(\d+)$", str(key))
|
|
if match:
|
|
indices.append(int(match.group(1)))
|
|
return sorted(set(indices))
|
|
|
|
|
|
def _switch_requested_index(index: Any, index_base: str) -> int:
|
|
requested = int(index)
|
|
return requested + 1 if index_base == "zero_based" else requested
|
|
|
|
|
|
def _switch_resolved_index(requested: int, available: list[int], missing_behavior: str) -> int | None:
|
|
if requested in available:
|
|
return requested
|
|
if missing_behavior in ("fallback", "none") or not available:
|
|
return None
|
|
if missing_behavior == "wrap":
|
|
return available[(requested - 1) % len(available)]
|
|
if requested <= available[0]:
|
|
return available[0]
|
|
if requested >= available[-1]:
|
|
return available[-1]
|
|
lower = [value for value in available if value <= requested]
|
|
return lower[-1] if lower else available[0]
|
|
|
|
|
|
def _switch_status(requested: int, selected: int | None, used_fallback: bool, available: list[int]) -> str:
|
|
available_text = ",".join(str(index) for index in available) or "none"
|
|
if used_fallback:
|
|
return f"requested=input_{requested}; selected=fallback; available={available_text}"
|
|
if selected is None:
|
|
return f"requested=input_{requested}; selected=none; available={available_text}"
|
|
return f"requested=input_{requested}; selected=input_{selected}; available={available_text}"
|
|
|
|
|
|
class SxCPWhileLoopStart:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
inputs = {
|
|
"required": {
|
|
"condition": ("BOOLEAN", {"default": True}),
|
|
},
|
|
"optional": {},
|
|
}
|
|
for index in range(MAX_LOOP_VALUES):
|
|
inputs["optional"][f"initial_value{index}"] = (ANY_TYPE,)
|
|
return inputs
|
|
|
|
RETURN_TYPES = tuple(["FLOW_CONTROL"] + [ANY_TYPE] * MAX_LOOP_VALUES)
|
|
RETURN_NAMES = tuple(["flow"] + [f"value{index}" for index in range(MAX_LOOP_VALUES)])
|
|
FUNCTION = "open"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
def open(self, condition, **kwargs):
|
|
values = []
|
|
for index in range(MAX_LOOP_VALUES):
|
|
values.append(kwargs.get(f"initial_value{index}") if condition else _execution_blocker())
|
|
return tuple(["stub"] + values)
|
|
|
|
|
|
class SxCPWhileLoopEnd:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
inputs = {
|
|
"required": {
|
|
"flow": ("FLOW_CONTROL", {"rawLink": True}),
|
|
"condition": ("BOOLEAN", {}),
|
|
},
|
|
"optional": {},
|
|
"hidden": {
|
|
"dynprompt": "DYNPROMPT",
|
|
"unique_id": "UNIQUE_ID",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
},
|
|
}
|
|
for index in range(MAX_LOOP_VALUES):
|
|
inputs["optional"][f"initial_value{index}"] = (ANY_TYPE,)
|
|
return inputs
|
|
|
|
RETURN_TYPES = tuple([ANY_TYPE] * MAX_LOOP_VALUES)
|
|
RETURN_NAMES = tuple([f"value{index}" for index in range(MAX_LOOP_VALUES)])
|
|
FUNCTION = "close"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
def _explore_dependencies(self, node_id: str, dynprompt: Any, upstream: dict[str, list[str]], parent_ids: list[str]) -> None:
|
|
node_info = dynprompt.get_node(node_id)
|
|
if "inputs" not in node_info:
|
|
return
|
|
for value in node_info["inputs"].values():
|
|
if not is_link(value):
|
|
continue
|
|
parent_id = value[0]
|
|
display_id = dynprompt.get_display_node_id(parent_id)
|
|
display_node = dynprompt.get_node(display_id)
|
|
class_type = display_node["class_type"]
|
|
if class_type not in ("SxCPForLoopEnd", "SxCPWhileLoopEnd"):
|
|
parent_ids.append(display_id)
|
|
if parent_id not in upstream:
|
|
upstream[parent_id] = []
|
|
self._explore_dependencies(parent_id, dynprompt, upstream, parent_ids)
|
|
upstream[parent_id].append(node_id)
|
|
|
|
def _explore_output_nodes(
|
|
self,
|
|
dynprompt: Any,
|
|
upstream: dict[str, list[str]],
|
|
output_nodes: dict[str, Any],
|
|
parent_ids: list[str],
|
|
) -> None:
|
|
for parent_id in upstream:
|
|
display_id = dynprompt.get_display_node_id(parent_id)
|
|
for output_id, link in output_nodes.items():
|
|
linked_id = link[0]
|
|
if linked_id in parent_ids and display_id == linked_id and output_id not in upstream[parent_id]:
|
|
if "." in parent_id:
|
|
parts = parent_id.split(".")
|
|
parts[-1] = output_id
|
|
upstream[parent_id].append(".".join(parts))
|
|
else:
|
|
upstream[parent_id].append(output_id)
|
|
|
|
def _collect_contained(self, node_id: str, upstream: dict[str, list[str]], contained: dict[str, bool]) -> None:
|
|
if node_id not in upstream:
|
|
return
|
|
for child_id in upstream[node_id]:
|
|
if child_id in contained:
|
|
continue
|
|
contained[child_id] = True
|
|
self._collect_contained(child_id, upstream, contained)
|
|
|
|
def close(self, flow, condition, dynprompt=None, unique_id=None, **kwargs):
|
|
if not condition:
|
|
return tuple(kwargs.get(f"initial_value{index}") for index in range(MAX_LOOP_VALUES))
|
|
|
|
_require_graph_builder()
|
|
upstream: dict[str, list[str]] = {}
|
|
parent_ids: list[str] = []
|
|
self._explore_dependencies(unique_id, dynprompt, upstream, parent_ids)
|
|
parent_ids = list(set(parent_ids))
|
|
|
|
output_nodes = {}
|
|
for node_id, node in dynprompt.get_original_prompt().items():
|
|
if "inputs" not in node:
|
|
continue
|
|
class_def = ALL_NODE_CLASS_MAPPINGS.get(node["class_type"])
|
|
if not class_def or not getattr(class_def, "OUTPUT_NODE", False):
|
|
continue
|
|
for value in node["inputs"].values():
|
|
if is_link(value):
|
|
output_nodes[node_id] = value
|
|
|
|
graph = GraphBuilder()
|
|
self._explore_output_nodes(dynprompt, upstream, output_nodes, parent_ids)
|
|
contained: dict[str, bool] = {}
|
|
open_node = flow[0]
|
|
self._collect_contained(open_node, upstream, contained)
|
|
contained[unique_id] = True
|
|
contained[open_node] = True
|
|
|
|
for node_id in contained:
|
|
original_node = dynprompt.get_node(node_id)
|
|
node = graph.node(original_node["class_type"], "Recurse" if node_id == unique_id else node_id)
|
|
node.set_override_display_id(node_id)
|
|
for node_id in contained:
|
|
original_node = dynprompt.get_node(node_id)
|
|
node = graph.lookup_node("Recurse" if node_id == unique_id else node_id)
|
|
for key, value in original_node["inputs"].items():
|
|
if is_link(value) and value[0] in contained:
|
|
parent = graph.lookup_node(value[0])
|
|
node.set_input(key, parent.out(value[1]))
|
|
else:
|
|
node.set_input(key, value)
|
|
|
|
new_open = graph.lookup_node(open_node)
|
|
original_open = dynprompt.get_node(open_node)
|
|
if original_open["class_type"] == "SxCPForLoopStart":
|
|
new_open.set_input("initial_index", kwargs.get("initial_value0"))
|
|
new_open.set_input("initial_collected", kwargs.get("initial_value1"))
|
|
for carry_index in range(1, MAX_CARRY_VALUES + 1):
|
|
new_open.set_input(f"initial_value{carry_index}", kwargs.get(f"initial_value{carry_index + 1}"))
|
|
else:
|
|
for index in range(MAX_LOOP_VALUES):
|
|
new_open.set_input(f"initial_value{index}", kwargs.get(f"initial_value{index}"))
|
|
my_clone = graph.lookup_node("Recurse")
|
|
return {
|
|
"result": tuple(my_clone.out(index) for index in range(MAX_LOOP_VALUES)),
|
|
"expand": graph.finalize(),
|
|
}
|
|
|
|
|
|
class SxCPForLoopStart:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"total": ("INT", {"default": 2, "min": 1, "max": 100000, "step": 1}),
|
|
"skip": ("INT", {"default": 0, "min": 0, "max": 100000, "step": 1}),
|
|
},
|
|
"optional": {
|
|
f"initial_value{index}": (ANY_TYPE,) for index in range(1, MAX_CARRY_VALUES + 1)
|
|
},
|
|
"hidden": {
|
|
"initial_index": (ANY_TYPE,),
|
|
"initial_collected": (ANY_TYPE,),
|
|
"prompt": "PROMPT",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
"unique_id": "UNIQUE_ID",
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = tuple(["FLOW_CONTROL", "INT", ANY_TYPE] + [ANY_TYPE] * MAX_CARRY_VALUES)
|
|
RETURN_NAMES = tuple(["flow", "index", "collected"] + [f"value{index}" for index in range(1, MAX_CARRY_VALUES + 1)])
|
|
FUNCTION = "start"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
def start(self, total, skip=0, initial_index=None, initial_collected=None, **kwargs):
|
|
_require_graph_builder()
|
|
total = max(1, int(total))
|
|
skip = max(0, int(skip))
|
|
first_index = skip + 1
|
|
index = first_index if initial_index is None else max(int(initial_index), first_index)
|
|
collected = initial_collected
|
|
initial_values = {
|
|
"initial_value0": index,
|
|
"initial_value1": collected,
|
|
}
|
|
for carry_index in range(1, MAX_CARRY_VALUES + 1):
|
|
initial_values[f"initial_value{carry_index + 1}"] = kwargs.get(f"initial_value{carry_index}")
|
|
graph = GraphBuilder()
|
|
graph.node("SxCPWhileLoopStart", condition=index <= total, **initial_values)
|
|
return {
|
|
"result": tuple(["stub", index, collected] + [kwargs.get(f"initial_value{index}") for index in range(1, MAX_CARRY_VALUES + 1)]),
|
|
"expand": graph.finalize(),
|
|
}
|
|
|
|
|
|
class SxCPLoopAppend:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"mode": (COLLECTION_MODES, {"default": "auto_batch"}),
|
|
"skip_none": ("BOOLEAN", {"default": True}),
|
|
},
|
|
"optional": {
|
|
"collection": (ANY_TYPE,),
|
|
"value": (ANY_TYPE,),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = (ANY_TYPE,)
|
|
RETURN_NAMES = ("collected",)
|
|
FUNCTION = "append"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
def append(self, mode, skip_none, collection=None, value=None):
|
|
return (append_collected_value(collection, value, mode=mode, skip_none=skip_none),)
|
|
|
|
|
|
class SxCPIndexSwitch:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"index": ("INT", {"default": 1, "min": -100000, "max": 100000, "step": 1}),
|
|
"mode": (INDEX_SWITCH_MODES, {"default": "pick_input"}),
|
|
"index_base": (INDEX_SWITCH_BASES, {"default": "one_based"}),
|
|
"missing_behavior": (INDEX_SWITCH_MISSING_BEHAVIORS, {"default": "fallback"}),
|
|
},
|
|
"optional": {
|
|
"fallback": (ANY_TYPE, {"lazy": True}),
|
|
"route_value": (ANY_TYPE, {"lazy": True}),
|
|
**{
|
|
f"input_{index}": (ANY_TYPE, {"lazy": True})
|
|
for index in range(1, MAX_SWITCH_INPUTS + 1)
|
|
},
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = tuple([ANY_TYPE, "INT", "STRING"] + [ANY_TYPE] * MAX_SWITCH_INPUTS)
|
|
RETURN_NAMES = tuple(["value", "selected_index", "status"] + [f"output_{index}" for index in range(1, MAX_SWITCH_INPUTS + 1)])
|
|
FUNCTION = "switch"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
def _input_selection(
|
|
self,
|
|
index: Any,
|
|
index_base: str,
|
|
missing_behavior: str,
|
|
kwargs: dict[str, Any],
|
|
) -> tuple[int, int | None, list[int]]:
|
|
index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based"
|
|
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
|
requested = _switch_requested_index(index, index_base)
|
|
available = _switch_available_indices(kwargs)
|
|
selected = _switch_resolved_index(requested, available, missing_behavior)
|
|
return requested, selected, available
|
|
|
|
def _route_selection(self, index: Any, index_base: str, missing_behavior: str) -> tuple[int, int | None]:
|
|
index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based"
|
|
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
|
requested = _switch_requested_index(index, index_base)
|
|
if 1 <= requested <= MAX_SWITCH_INPUTS:
|
|
return requested, requested
|
|
if missing_behavior == "wrap":
|
|
return requested, ((requested - 1) % MAX_SWITCH_INPUTS) + 1
|
|
if missing_behavior == "clamp":
|
|
return requested, min(max(requested, 1), MAX_SWITCH_INPUTS)
|
|
return requested, None
|
|
|
|
def _blocked_outputs(self) -> list[Any]:
|
|
return [_execution_blocker() for _index in range(MAX_SWITCH_INPUTS)]
|
|
|
|
def check_lazy_status(self, index, mode, index_base, missing_behavior, **kwargs):
|
|
mode = mode if mode in INDEX_SWITCH_MODES else "pick_input"
|
|
if mode == "route_output":
|
|
return ["route_value"] if "route_value" in kwargs else []
|
|
requested, selected, _available = self._input_selection(index, index_base, missing_behavior, kwargs)
|
|
selected_name = f"input_{selected}" if selected is not None else f"input_{requested}"
|
|
if selected_name in kwargs:
|
|
return [selected_name]
|
|
if missing_behavior == "fallback" and "fallback" in kwargs:
|
|
return ["fallback"]
|
|
return []
|
|
|
|
def switch(self, index, mode, index_base, missing_behavior, **kwargs):
|
|
mode = mode if mode in INDEX_SWITCH_MODES else "pick_input"
|
|
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
|
if mode == "route_output":
|
|
requested, selected = self._route_selection(index, index_base, missing_behavior)
|
|
value = kwargs.get("route_value")
|
|
outputs = self._blocked_outputs()
|
|
if selected is not None and "route_value" in kwargs:
|
|
outputs[selected - 1] = value
|
|
status = f"mode=route_output; requested=output_{requested}; selected={'none' if selected is None else f'output_{selected}'}; range=1-{MAX_SWITCH_INPUTS}"
|
|
selected_index = selected or 0
|
|
return tuple([value if "route_value" in kwargs else None, selected_index, status] + outputs)
|
|
|
|
requested, selected, available = self._input_selection(index, index_base, missing_behavior, kwargs)
|
|
if selected is not None:
|
|
selected_name = f"input_{selected}"
|
|
if selected_name in kwargs:
|
|
value = kwargs.get(selected_name)
|
|
status = f"mode=pick_input; {_switch_status(requested, selected, False, available)}"
|
|
return tuple([value, selected, status] + self._blocked_outputs())
|
|
if missing_behavior == "fallback" and "fallback" in kwargs:
|
|
status = f"mode=pick_input; {_switch_status(requested, None, True, available)}"
|
|
return tuple([kwargs.get("fallback"), 0, status] + self._blocked_outputs())
|
|
status = f"mode=pick_input; {_switch_status(requested, None, False, available)}"
|
|
return tuple([None, 0, status] + self._blocked_outputs())
|
|
|
|
|
|
class SxCPAccumulator:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"store_key": ("STRING", {"default": "", "multiline": False}),
|
|
"action": (ACCUMULATOR_ACTIONS, {"default": "append_variant"}),
|
|
"max_items": ("INT", {"default": 32, "min": 1, "max": 10000, "step": 1}),
|
|
"image_batch_mode": (ACCUMULATOR_IMAGE_BATCH_MODES, {"default": "same_size_only"}),
|
|
"skip_empty": ("BOOLEAN", {"default": True}),
|
|
},
|
|
"optional": {
|
|
"image": ("IMAGE",),
|
|
"value": (ANY_TYPE,),
|
|
"entry_id": (ANY_TYPE,),
|
|
"entry_tag": ("STRING", {"default": "", "multiline": False}),
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
"unique_id": "UNIQUE_ID",
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = tuple([ANY_TYPE, "IMAGE", "IMAGE"] + ["IMAGE"] * ACCUMULATOR_IMAGE_GROUPS + ["INT", "STRING", "STRING"])
|
|
RETURN_NAMES = tuple(
|
|
["collection", "image_batch", "image_list"]
|
|
+ [f"image_batch_{index}" for index in range(1, ACCUMULATOR_IMAGE_GROUPS + 1)]
|
|
+ ["count", "status", "store_key"]
|
|
)
|
|
OUTPUT_IS_LIST = tuple([False, False, True] + [False] * ACCUMULATOR_IMAGE_GROUPS + [False, False, False])
|
|
FUNCTION = "accumulate"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
@classmethod
|
|
def IS_CHANGED(cls, *args, **kwargs):
|
|
return random.random()
|
|
|
|
def _store_key(self, store_key: str, unique_id: Any) -> str:
|
|
return _accumulator_store_key(store_key, unique_id)
|
|
|
|
def _entry_id(self, entry_id: Any, entry_tag: str, image_index: int, image_count: int) -> str:
|
|
text = "" if entry_id is None else str(entry_id).strip()
|
|
tag = str(entry_tag or "").strip()
|
|
if text and tag:
|
|
text = f"{text}:{tag}"
|
|
elif tag:
|
|
text = tag
|
|
if image_count <= 1:
|
|
return text
|
|
return f"{text or 'image'}:{image_index + 1}"
|
|
|
|
def _value_for_image(self, value: Any, image_index: int, image_count: int) -> Any:
|
|
if image_count <= 1:
|
|
return value
|
|
if isinstance(value, (list, tuple)) and len(value) == image_count:
|
|
return value[image_index]
|
|
return value
|
|
|
|
def _entry_records(
|
|
self,
|
|
image: Any,
|
|
value: Any,
|
|
entry_id: Any,
|
|
entry_tag: str,
|
|
skip_empty: bool,
|
|
metadata: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
images = _split_image_value(image)
|
|
if not images:
|
|
if value is None and skip_empty:
|
|
return []
|
|
return [{"id": self._entry_id(entry_id, entry_tag, 0, 1), "image": None, "value": value, **metadata}]
|
|
image_count = len(images)
|
|
return [
|
|
{
|
|
"id": self._entry_id(entry_id, entry_tag, index, image_count),
|
|
"image": image_item,
|
|
"value": self._value_for_image(value, index, image_count),
|
|
**metadata,
|
|
}
|
|
for index, image_item in enumerate(images)
|
|
]
|
|
|
|
def _variant_entry_id(self, store: list[dict[str, Any]], base_id: str) -> str:
|
|
base_id = base_id or "entry"
|
|
prefix = f"{base_id}#"
|
|
count = 0
|
|
for existing in store:
|
|
existing_id = str(existing.get("id") or "")
|
|
if existing_id == base_id or existing_id.startswith(prefix):
|
|
count += 1
|
|
return f"{base_id}#{count + 1}"
|
|
|
|
def _append_or_replace(self, store: list[dict[str, Any]], entries: list[dict[str, Any]], action: str) -> None:
|
|
replace = action == "replace_by_entry_id"
|
|
for entry in entries:
|
|
if action == "append_variant":
|
|
entry = dict(entry)
|
|
entry["id"] = self._variant_entry_id(store, str(entry.get("id") or ""))
|
|
store.append(entry)
|
|
continue
|
|
entry_id = entry.get("id") or ""
|
|
if replace and entry_id:
|
|
for index, existing in enumerate(store):
|
|
if existing.get("id") == entry_id:
|
|
store[index] = entry
|
|
break
|
|
else:
|
|
store.append(entry)
|
|
else:
|
|
store.append(entry)
|
|
|
|
def _collection(self, store: list[dict[str, Any]]) -> list[Any]:
|
|
collection = []
|
|
for entry in store:
|
|
value = entry.get("value")
|
|
collection.append(value if value is not None else entry.get("image"))
|
|
return collection
|
|
|
|
def _status(self, key: str, store: list[dict[str, Any]], image_batch: Any, image_batches: list[Any]) -> str:
|
|
batch_state = "all images batched" if image_batch is not None else "mixed sizes or no image batch"
|
|
return f"{_accumulator_status(key, store)}; grouped_batches={len(image_batches)}; {batch_state}"
|
|
|
|
def accumulate(
|
|
self,
|
|
store_key,
|
|
action,
|
|
max_items,
|
|
image_batch_mode,
|
|
skip_empty,
|
|
image=None,
|
|
value=None,
|
|
entry_id=None,
|
|
entry_tag="",
|
|
prompt=None,
|
|
extra_pnginfo=None,
|
|
unique_id=None,
|
|
):
|
|
key = self._store_key(store_key, unique_id)
|
|
action = action if action in ACCUMULATOR_ACTIONS else "append_variant"
|
|
store = _ACCUMULATOR_STORES.setdefault(key, [])
|
|
|
|
if action in ("clear", "clear_then_append"):
|
|
store.clear()
|
|
|
|
if action not in ("clear", "read"):
|
|
metadata = _entry_metadata(prompt, extra_pnginfo)
|
|
entries = self._entry_records(image, value, entry_id, entry_tag, bool(skip_empty), metadata)
|
|
self._append_or_replace(store, entries, action)
|
|
|
|
max_items = max(1, int(max_items))
|
|
if len(store) > max_items:
|
|
del store[: len(store) - max_items]
|
|
|
|
images = [entry["image"] for entry in store if entry.get("image") is not None]
|
|
image_batch = _image_batch_from_images(images, image_batch_mode)
|
|
image_batches = _group_image_batches(images)
|
|
grouped_outputs = image_batches[:ACCUMULATOR_IMAGE_GROUPS]
|
|
grouped_outputs += [None] * (ACCUMULATOR_IMAGE_GROUPS - len(grouped_outputs))
|
|
status = self._status(key, store, image_batch, image_batches)
|
|
return tuple([self._collection(store), image_batch, images] + grouped_outputs + [len(store), status, key])
|
|
|
|
|
|
class SxCPAccumulatorPreview:
|
|
OUTPUT_NODE = True
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"store_key": ("STRING", {"default": "", "multiline": False}),
|
|
"preview_limit": ("INT", {"default": 64, "min": 1, "max": 512, "step": 1}),
|
|
"view_mode": (ACCUMULATOR_PREVIEW_VIEW_MODES, {"default": "grid"}),
|
|
"zoom_level": ("FLOAT", {"default": 1.0, "min": 0.5, "max": 3.0, "step": 0.05}),
|
|
"carousel_index": ("INT", {"default": 1, "min": 1, "max": 100000, "step": 1}),
|
|
"delete_action": (ACCUMULATOR_PREVIEW_DELETE_ACTIONS, {"default": "none"}),
|
|
"delete_entry_id": ("STRING", {"default": "", "multiline": False}),
|
|
"delete_index": ("INT", {"default": 0, "min": 0, "max": 100000, "step": 1}),
|
|
"save_batch": ("BOOLEAN", {"default": False}),
|
|
"finished": ("BOOLEAN", {"default": True}),
|
|
"save_path": ("STRING", {"default": "sxcp_accumulator", "multiline": False}),
|
|
"filename_prefix": ("STRING", {"default": "sxcp_accum", "multiline": False}),
|
|
"clear_after_save": ("BOOLEAN", {"default": False}),
|
|
},
|
|
"optional": {
|
|
"store_key_input": ("STRING", {"forceInput": True}),
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
"unique_id": "UNIQUE_ID",
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("INT", "STRING", "STRING")
|
|
RETURN_NAMES = ("count", "status", "saved_paths_json")
|
|
FUNCTION = "preview"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
@classmethod
|
|
def IS_CHANGED(cls, *args, **kwargs):
|
|
return random.random()
|
|
|
|
def _delete_from_inputs(self, key: str, delete_action: str, delete_entry_id: str, delete_index: int) -> int:
|
|
action = delete_action if delete_action in ACCUMULATOR_PREVIEW_DELETE_ACTIONS else "none"
|
|
if action == "none":
|
|
return 0
|
|
if action == "clear":
|
|
return int(accumulator_delete_entries(key, clear=True).get("removed", 0))
|
|
if action == "delete_entry_id":
|
|
entry_id = str(delete_entry_id or "").strip()
|
|
if not entry_id:
|
|
return 0
|
|
return int(accumulator_delete_entries(key, entry_id=entry_id).get("removed", 0))
|
|
if action == "delete_index":
|
|
index = int(delete_index)
|
|
if index <= 0:
|
|
return 0
|
|
return int(accumulator_delete_entries(key, index=index).get("removed", 0))
|
|
return 0
|
|
|
|
def preview(
|
|
self,
|
|
store_key,
|
|
preview_limit,
|
|
view_mode,
|
|
zoom_level,
|
|
carousel_index,
|
|
delete_action,
|
|
delete_entry_id,
|
|
delete_index,
|
|
save_batch,
|
|
finished,
|
|
save_path,
|
|
filename_prefix,
|
|
clear_after_save,
|
|
store_key_input=None,
|
|
prompt=None,
|
|
extra_pnginfo=None,
|
|
unique_id=None,
|
|
):
|
|
key_source = str(store_key_input or "").strip() or store_key
|
|
key = _accumulator_store_key(key_source, unique_id)
|
|
store = _ACCUMULATOR_STORES.setdefault(key, [])
|
|
removed = self._delete_from_inputs(key, delete_action, delete_entry_id, delete_index)
|
|
images = [entry["image"] for entry in store if entry.get("image") is not None]
|
|
|
|
saved_paths: list[str] = []
|
|
save_status = ""
|
|
if bool(save_batch) and bool(finished):
|
|
saved_paths = _save_images_to_folder(store, save_path, filename_prefix, prompt, extra_pnginfo)
|
|
save_status = f"; saved={len(saved_paths)}"
|
|
if saved_paths and bool(clear_after_save):
|
|
store.clear()
|
|
images = []
|
|
save_status += "; cleared_after_save"
|
|
|
|
entries = _entry_infos(store)
|
|
preview_images = _preview_image_results(store, preview_limit, prompt, extra_pnginfo)
|
|
_attach_preview_images(entries, preview_images)
|
|
status = _accumulator_status(key, store)
|
|
if removed:
|
|
status += f"; removed={removed}"
|
|
status += save_status
|
|
saved_json = json.dumps(saved_paths, ensure_ascii=True)
|
|
return {
|
|
"ui": {
|
|
"images": preview_images,
|
|
"entries": entries,
|
|
"status": [status],
|
|
"saved_paths": saved_paths,
|
|
"store_key": [key],
|
|
},
|
|
"result": (len(store), status, saved_json),
|
|
}
|
|
|
|
|
|
class SxCPPreviewAnyAsText:
|
|
OUTPUT_NODE = True
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"preview_text": ("STRING", {"default": "", "multiline": True}),
|
|
"preview_format": (PREVIEW_TEXT_FORMATS, {"default": "auto"}),
|
|
"max_chars": ("INT", {"default": 20000, "min": 0, "max": 200000, "step": 1000}),
|
|
},
|
|
"optional": {
|
|
"value": (ANY_TYPE, {"forceInput": True}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("STRING",)
|
|
RETURN_NAMES = ("text",)
|
|
FUNCTION = "preview"
|
|
CATEGORY = "prompt_builder/util"
|
|
|
|
@classmethod
|
|
def IS_CHANGED(cls, *args, **kwargs):
|
|
return random.random()
|
|
|
|
def preview(self, preview_text, preview_format, max_chars, **kwargs):
|
|
if "value" not in kwargs:
|
|
text = _truncate_preview_text(str(preview_text or ""), int(max_chars))
|
|
else:
|
|
value = kwargs.get("value")
|
|
text = _any_to_preview_text(value, str(preview_format or "auto"), int(max_chars))
|
|
return {
|
|
"ui": {
|
|
"preview_text": [text],
|
|
},
|
|
"result": (text,),
|
|
}
|
|
|
|
|
|
class SxCPForLoopEnd:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"flow": ("FLOW_CONTROL", {"rawLink": True}),
|
|
"collection_mode": (COLLECTION_MODES, {"default": "auto_batch"}),
|
|
"skip_none": ("BOOLEAN", {"default": True}),
|
|
},
|
|
"optional": {
|
|
"collected": (ANY_TYPE, {"rawLink": True}),
|
|
"collect_value": (ANY_TYPE, {"rawLink": True}),
|
|
**{
|
|
f"initial_value{index}": (ANY_TYPE, {"rawLink": True})
|
|
for index in range(1, MAX_CARRY_VALUES + 1)
|
|
},
|
|
},
|
|
"hidden": {
|
|
"dynprompt": "DYNPROMPT",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
"unique_id": "UNIQUE_ID",
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = tuple([ANY_TYPE] + [ANY_TYPE] * MAX_CARRY_VALUES)
|
|
RETURN_NAMES = tuple(["collected"] + [f"value{index}" for index in range(1, MAX_CARRY_VALUES + 1)])
|
|
FUNCTION = "end"
|
|
CATEGORY = "prompt_builder/loop"
|
|
|
|
def end(self, flow, collection_mode, skip_none, dynprompt=None, **kwargs):
|
|
_require_graph_builder()
|
|
graph = GraphBuilder()
|
|
loop_start = flow[0]
|
|
start_node = dynprompt.get_node(loop_start)
|
|
if start_node["class_type"] != "SxCPForLoopStart":
|
|
raise ValueError("SxCP For Loop End must receive flow from SxCP For Loop Start.")
|
|
total = start_node["inputs"]["total"]
|
|
next_index = graph.node("SxCPLoopIntAdd", a=[loop_start, 1], b=1)
|
|
condition = graph.node("SxCPLoopLessThanOrEqual", a=next_index.out(0), b=total)
|
|
collection = kwargs.get("collected") or [loop_start, 2]
|
|
collect_value = kwargs.get("collect_value")
|
|
next_collection = graph.node(
|
|
"SxCPLoopAppend",
|
|
collection=collection,
|
|
value=collect_value,
|
|
mode=collection_mode,
|
|
skip_none=skip_none,
|
|
)
|
|
next_values = {
|
|
"initial_value0": next_index.out(0),
|
|
"initial_value1": next_collection.out(0),
|
|
}
|
|
for carry_index in range(1, MAX_CARRY_VALUES + 1):
|
|
next_values[f"initial_value{carry_index + 1}"] = kwargs.get(f"initial_value{carry_index}")
|
|
while_close = graph.node("SxCPWhileLoopEnd", flow=flow, condition=condition.out(0), **next_values)
|
|
return {
|
|
"result": tuple(while_close.out(index) for index in range(1, MAX_LOOP_VALUES)),
|
|
"expand": graph.finalize(),
|
|
}
|
|
|
|
|
|
class SxCPLoopIntAdd:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"a": ("INT", {"default": 0}),
|
|
"b": ("INT", {"default": 1}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("INT",)
|
|
RETURN_NAMES = ("int",)
|
|
FUNCTION = "add"
|
|
CATEGORY = "prompt_builder/loop/internal"
|
|
|
|
def add(self, a, b):
|
|
return (int(a) + int(b),)
|
|
|
|
|
|
class SxCPLoopLessThan:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"a": ("INT", {"default": 0}),
|
|
"b": ("INT", {"default": 1}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("BOOLEAN",)
|
|
RETURN_NAMES = ("boolean",)
|
|
FUNCTION = "compare"
|
|
CATEGORY = "prompt_builder/loop/internal"
|
|
|
|
def compare(self, a, b):
|
|
return (int(a) < int(b),)
|
|
|
|
|
|
class SxCPLoopLessThanOrEqual:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"a": ("INT", {"default": 0}),
|
|
"b": ("INT", {"default": 0}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("BOOLEAN",)
|
|
FUNCTION = "compare"
|
|
CATEGORY = "prompt_builder/loop/internal"
|
|
|
|
def compare(self, a, b):
|
|
return (int(a) <= int(b),)
|
|
|
|
|
|
LOOP_NODE_CLASS_MAPPINGS = {
|
|
"SxCPWhileLoopStart": SxCPWhileLoopStart,
|
|
"SxCPWhileLoopEnd": SxCPWhileLoopEnd,
|
|
"SxCPForLoopStart": SxCPForLoopStart,
|
|
"SxCPForLoopEnd": SxCPForLoopEnd,
|
|
"SxCPLoopAppend": SxCPLoopAppend,
|
|
"SxCPIndexSwitch": SxCPIndexSwitch,
|
|
"SxCPAccumulator": SxCPAccumulator,
|
|
"SxCPAccumulatorPreview": SxCPAccumulatorPreview,
|
|
"SxCPPreviewAnyAsText": SxCPPreviewAnyAsText,
|
|
"SxCPLoopIntAdd": SxCPLoopIntAdd,
|
|
"SxCPLoopLessThan": SxCPLoopLessThan,
|
|
"SxCPLoopLessThanOrEqual": SxCPLoopLessThanOrEqual,
|
|
}
|
|
|
|
LOOP_NODE_DISPLAY_NAME_MAPPINGS = {
|
|
"SxCPWhileLoopStart": "SxCP While Loop Start",
|
|
"SxCPWhileLoopEnd": "SxCP While Loop End",
|
|
"SxCPForLoopStart": "SxCP For Loop Start",
|
|
"SxCPForLoopEnd": "SxCP For Loop End",
|
|
"SxCPLoopAppend": "SxCP Loop Append",
|
|
"SxCPIndexSwitch": "SxCP Index Switch",
|
|
"SxCPAccumulator": "SxCP Accumulator",
|
|
"SxCPAccumulatorPreview": "SxCP Accumulator Preview",
|
|
"SxCPPreviewAnyAsText": "SxCP Preview Any As Text",
|
|
"SxCPLoopIntAdd": "SxCP Loop Int Add",
|
|
"SxCPLoopLessThan": "SxCP Loop Less Than",
|
|
"SxCPLoopLessThanOrEqual": "SxCP Loop Less Than Or Equal",
|
|
}
|