Add bidirectional autoscaling index switch

This commit is contained in:
2026-06-25 08:33:35 +02:00
parent a60258dc4c
commit 55477bd826
3 changed files with 294 additions and 0 deletions
+137
View File
@@ -41,11 +41,15 @@ except Exception:
MAX_LOOP_VALUES = 20
MAX_CARRY_VALUES = MAX_LOOP_VALUES - 2
MAX_SWITCH_INPUTS = 64
COLLECTION_MODES = ["auto_batch", "list", "image_batch", "latent_batch", "string_lines"]
ACCUMULATOR_ACTIONS = ["append_variant", "replace_by_entry_id", "append", "clear_then_append", "clear", "read"]
ACCUMULATOR_IMAGE_BATCH_MODES = ["same_size_only", "resize_to_first"]
ACCUMULATOR_IMAGE_GROUPS = 4
ACCUMULATOR_PREVIEW_DELETE_ACTIONS = ["none", "delete_entry_id", "delete_index", "clear"]
INDEX_SWITCH_MODES = ["pick_input", "route_output"]
INDEX_SWITCH_BASES = ["one_based", "zero_based"]
INDEX_SWITCH_MISSING_BEHAVIORS = ["fallback", "none", "clamp", "wrap"]
_ACCUMULATOR_STORES: dict[str, list[dict[str, Any]]] = {}
@@ -431,6 +435,44 @@ def append_collected_value(collection: Any, value: Any, mode: str = "auto_batch"
return _as_list(collection) + [value]
def _switch_available_indices(kwargs: dict[str, Any]) -> list[int]:
indices = []
for key in kwargs:
match = re.match(r"^input_(\d+)$", str(key))
if match:
indices.append(int(match.group(1)))
return sorted(set(indices))
def _switch_requested_index(index: Any, index_base: str) -> int:
requested = int(index)
return requested + 1 if index_base == "zero_based" else requested
def _switch_resolved_index(requested: int, available: list[int], missing_behavior: str) -> int | None:
if requested in available:
return requested
if missing_behavior in ("fallback", "none") or not available:
return None
if missing_behavior == "wrap":
return available[(requested - 1) % len(available)]
if requested <= available[0]:
return available[0]
if requested >= available[-1]:
return available[-1]
lower = [value for value in available if value <= requested]
return lower[-1] if lower else available[0]
def _switch_status(requested: int, selected: int | None, used_fallback: bool, available: list[int]) -> str:
available_text = ",".join(str(index) for index in available) or "none"
if used_fallback:
return f"requested=input_{requested}; selected=fallback; available={available_text}"
if selected is None:
return f"requested=input_{requested}; selected=none; available={available_text}"
return f"requested=input_{requested}; selected=input_{selected}; available={available_text}"
class SxCPWhileLoopStart:
@classmethod
def INPUT_TYPES(cls):
@@ -655,6 +697,99 @@ class SxCPLoopAppend:
return (append_collected_value(collection, value, mode=mode, skip_none=skip_none),)
class SxCPIndexSwitch:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"index": ("INT", {"default": 1, "min": -100000, "max": 100000, "step": 1}),
"mode": (INDEX_SWITCH_MODES, {"default": "pick_input"}),
"index_base": (INDEX_SWITCH_BASES, {"default": "one_based"}),
"missing_behavior": (INDEX_SWITCH_MISSING_BEHAVIORS, {"default": "fallback"}),
},
"optional": {
"fallback": (ANY_TYPE, {"lazy": True}),
"route_value": (ANY_TYPE, {"lazy": True}),
**{
f"input_{index}": (ANY_TYPE, {"lazy": True})
for index in range(1, MAX_SWITCH_INPUTS + 1)
},
},
}
RETURN_TYPES = tuple([ANY_TYPE, "INT", "STRING"] + [ANY_TYPE] * MAX_SWITCH_INPUTS)
RETURN_NAMES = tuple(["value", "selected_index", "status"] + [f"output_{index}" for index in range(1, MAX_SWITCH_INPUTS + 1)])
FUNCTION = "switch"
CATEGORY = "prompt_builder/loop"
def _input_selection(
self,
index: Any,
index_base: str,
missing_behavior: str,
kwargs: dict[str, Any],
) -> tuple[int, int | None, list[int]]:
index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based"
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
requested = _switch_requested_index(index, index_base)
available = _switch_available_indices(kwargs)
selected = _switch_resolved_index(requested, available, missing_behavior)
return requested, selected, available
def _route_selection(self, index: Any, index_base: str, missing_behavior: str) -> tuple[int, int | None]:
index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based"
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
requested = _switch_requested_index(index, index_base)
if 1 <= requested <= MAX_SWITCH_INPUTS:
return requested, requested
if missing_behavior == "wrap":
return requested, ((requested - 1) % MAX_SWITCH_INPUTS) + 1
if missing_behavior == "clamp":
return requested, min(max(requested, 1), MAX_SWITCH_INPUTS)
return requested, None
def _blocked_outputs(self) -> list[Any]:
return [_execution_blocker() for _index in range(MAX_SWITCH_INPUTS)]
def check_lazy_status(self, index, mode, index_base, missing_behavior, **kwargs):
mode = mode if mode in INDEX_SWITCH_MODES else "pick_input"
if mode == "route_output":
return ["route_value"] if "route_value" in kwargs else []
requested, selected, _available = self._input_selection(index, index_base, missing_behavior, kwargs)
selected_name = f"input_{selected}" if selected is not None else f"input_{requested}"
if selected_name in kwargs:
return [selected_name]
if missing_behavior == "fallback" and "fallback" in kwargs:
return ["fallback"]
return []
def switch(self, index, mode, index_base, missing_behavior, **kwargs):
mode = mode if mode in INDEX_SWITCH_MODES else "pick_input"
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
if mode == "route_output":
requested, selected = self._route_selection(index, index_base, missing_behavior)
value = kwargs.get("route_value")
outputs = self._blocked_outputs()
if selected is not None and "route_value" in kwargs:
outputs[selected - 1] = value
status = f"mode=route_output; requested=output_{requested}; selected={'none' if selected is None else f'output_{selected}'}; range=1-{MAX_SWITCH_INPUTS}"
selected_index = selected or 0
return tuple([value if "route_value" in kwargs else None, selected_index, status] + outputs)
requested, selected, available = self._input_selection(index, index_base, missing_behavior, kwargs)
if selected is not None:
selected_name = f"input_{selected}"
if selected_name in kwargs:
value = kwargs.get(selected_name)
status = f"mode=pick_input; {_switch_status(requested, selected, False, available)}"
return tuple([value, selected, status] + self._blocked_outputs())
if missing_behavior == "fallback" and "fallback" in kwargs:
status = f"mode=pick_input; {_switch_status(requested, None, True, available)}"
return tuple([kwargs.get("fallback"), 0, status] + self._blocked_outputs())
status = f"mode=pick_input; {_switch_status(requested, None, False, available)}"
return tuple([None, 0, status] + self._blocked_outputs())
class SxCPAccumulator:
@classmethod
def INPUT_TYPES(cls):
@@ -1049,6 +1184,7 @@ LOOP_NODE_CLASS_MAPPINGS = {
"SxCPForLoopStart": SxCPForLoopStart,
"SxCPForLoopEnd": SxCPForLoopEnd,
"SxCPLoopAppend": SxCPLoopAppend,
"SxCPIndexSwitch": SxCPIndexSwitch,
"SxCPAccumulator": SxCPAccumulator,
"SxCPAccumulatorPreview": SxCPAccumulatorPreview,
"SxCPLoopIntAdd": SxCPLoopIntAdd,
@@ -1062,6 +1198,7 @@ LOOP_NODE_DISPLAY_NAME_MAPPINGS = {
"SxCPForLoopStart": "SxCP For Loop Start",
"SxCPForLoopEnd": "SxCP For Loop End",
"SxCPLoopAppend": "SxCP Loop Append",
"SxCPIndexSwitch": "SxCP Index Switch",
"SxCPAccumulator": "SxCP Accumulator",
"SxCPAccumulatorPreview": "SxCP Accumulator Preview",
"SxCPLoopIntAdd": "SxCP Loop Int Add",