Extract index switch policy
This commit is contained in:
@@ -416,6 +416,10 @@ Already isolated:
|
||||
by `__init__.py`.
|
||||
- generation profile, advanced filter, and ethnicity list utility nodes live in
|
||||
`node_profile_filter.py`, with registration maps imported by `__init__.py`.
|
||||
- index-switch constants, index-base normalization, missing-input behavior,
|
||||
route-output selection, status text, and lazy-input selection live in
|
||||
`index_switch_policy.py`; `loop_nodes.py` keeps the ComfyUI node wrapper and
|
||||
accumulator/loop runtime logic.
|
||||
- profile-save and accumulator server payload handling lives in
|
||||
`server_routes.py`; `__init__.py` only wires those pure handlers to ComfyUI
|
||||
JSON responses, and `tools/prompt_smoke.py` covers the handlers without
|
||||
|
||||
@@ -722,7 +722,7 @@ These do not own prompt pool wording, but they affect execution and review:
|
||||
| Node family | Files | Purpose |
|
||||
| --- | --- | --- |
|
||||
| Loop nodes | `loop_nodes.py`, `web/loop_slots.js` | While/for loop execution and carry values. |
|
||||
| Index switch | `loop_nodes.py`, `web/index_switch_slots.js` | Multi-input to selected output, and selected input to multi-output routing. |
|
||||
| Index switch | `loop_nodes.py`, `index_switch_policy.py`, `web/index_switch_slots.js` | Multi-input to selected output, and selected input to multi-output routing. Pure index-base, missing-input, route-output, status, and lazy-input policy lives in `index_switch_policy.py`. |
|
||||
| Accumulator | `loop_nodes.py`, `web/accumulator_preview.js` | Stores generated values/images during workflow execution and previews/reorders/deletes them. |
|
||||
| Persistent text preview | `loop_nodes.py`, `web/preview_any_text.js` | Stores any value as text and keeps it after workflow reload. |
|
||||
| Builder node wrappers | `node_builder.py`, imported by `__init__.py` | Direct prompt builder and config-driven prompt builder ComfyUI declarations. |
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
|
||||
MAX_SWITCH_INPUTS = 64
|
||||
INDEX_SWITCH_MODES = ["pick_input", "route_output"]
|
||||
INDEX_SWITCH_BASES = ["one_based", "zero_based"]
|
||||
INDEX_SWITCH_MISSING_BEHAVIORS = ["fallback", "none", "clamp", "wrap"]
|
||||
|
||||
|
||||
def normalize_index_base(value: Any) -> str:
|
||||
return value if value in INDEX_SWITCH_BASES else "one_based"
|
||||
|
||||
|
||||
def normalize_missing_behavior(value: Any) -> str:
|
||||
return value if value in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
||||
|
||||
|
||||
def normalize_mode(value: Any) -> str:
|
||||
return value if value in INDEX_SWITCH_MODES else "pick_input"
|
||||
|
||||
|
||||
def available_input_indices(kwargs: dict[str, Any]) -> list[int]:
|
||||
indices = []
|
||||
for key in kwargs:
|
||||
match = re.match(r"^input_(\d+)$", str(key))
|
||||
if match:
|
||||
indices.append(int(match.group(1)))
|
||||
return sorted(set(indices))
|
||||
|
||||
|
||||
def requested_index(index: Any, index_base: str) -> int:
|
||||
requested = int(index)
|
||||
return requested + 1 if normalize_index_base(index_base) == "zero_based" else requested
|
||||
|
||||
|
||||
def resolved_input_index(requested: int, available: list[int], missing_behavior: str) -> int | None:
|
||||
missing_behavior = normalize_missing_behavior(missing_behavior)
|
||||
if requested in available:
|
||||
return requested
|
||||
if missing_behavior in ("fallback", "none") or not available:
|
||||
return None
|
||||
if missing_behavior == "wrap":
|
||||
return available[(requested - 1) % len(available)]
|
||||
if requested <= available[0]:
|
||||
return available[0]
|
||||
if requested >= available[-1]:
|
||||
return available[-1]
|
||||
lower = [value for value in available if value <= requested]
|
||||
return lower[-1] if lower else available[0]
|
||||
|
||||
|
||||
def input_selection(index: Any, index_base: str, missing_behavior: str, kwargs: dict[str, Any]) -> tuple[int, int | None, list[int]]:
|
||||
requested = requested_index(index, index_base)
|
||||
available = available_input_indices(kwargs)
|
||||
selected = resolved_input_index(requested, available, missing_behavior)
|
||||
return requested, selected, available
|
||||
|
||||
|
||||
def route_selection(index: Any, index_base: str, missing_behavior: str, max_outputs: int = MAX_SWITCH_INPUTS) -> tuple[int, int | None]:
|
||||
requested = requested_index(index, index_base)
|
||||
max_outputs = max(1, int(max_outputs))
|
||||
missing_behavior = normalize_missing_behavior(missing_behavior)
|
||||
if 1 <= requested <= max_outputs:
|
||||
return requested, requested
|
||||
if missing_behavior == "wrap":
|
||||
return requested, ((requested - 1) % max_outputs) + 1
|
||||
if missing_behavior == "clamp":
|
||||
return requested, min(max(requested, 1), max_outputs)
|
||||
return requested, None
|
||||
|
||||
|
||||
def input_status(requested: int, selected: int | None, used_fallback: bool, available: list[int]) -> str:
|
||||
available_text = ",".join(str(index) for index in available) or "none"
|
||||
if used_fallback:
|
||||
return f"requested=input_{requested}; selected=fallback; available={available_text}"
|
||||
if selected is None:
|
||||
return f"requested=input_{requested}; selected=none; available={available_text}"
|
||||
return f"requested=input_{requested}; selected=input_{selected}; available={available_text}"
|
||||
|
||||
|
||||
def route_status(requested: int, selected: int | None, max_outputs: int = MAX_SWITCH_INPUTS) -> str:
|
||||
selected_text = "none" if selected is None else f"output_{selected}"
|
||||
return f"requested=output_{requested}; selected={selected_text}; range=1-{max_outputs}"
|
||||
|
||||
|
||||
def lazy_inputs(index: Any, mode: str, index_base: str, missing_behavior: str, kwargs: dict[str, Any]) -> list[str]:
|
||||
mode = normalize_mode(mode)
|
||||
missing_behavior = normalize_missing_behavior(missing_behavior)
|
||||
if mode == "route_output":
|
||||
return ["route_value"] if "route_value" in kwargs else []
|
||||
requested, selected, _available = input_selection(index, index_base, missing_behavior, kwargs)
|
||||
selected_name = f"input_{selected}" if selected is not None else f"input_{requested}"
|
||||
if selected_name in kwargs:
|
||||
return [selected_name]
|
||||
if missing_behavior == "fallback" and "fallback" in kwargs:
|
||||
return ["fallback"]
|
||||
return []
|
||||
+18
-74
@@ -6,6 +6,11 @@ import random
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
try:
|
||||
from . import index_switch_policy
|
||||
except Exception: # Allows local imports outside ComfyUI package mode.
|
||||
import index_switch_policy
|
||||
|
||||
try:
|
||||
from comfy_execution.graph import ExecutionBlocker
|
||||
from comfy_execution.graph_utils import GraphBuilder, is_link
|
||||
@@ -41,16 +46,16 @@ except Exception:
|
||||
|
||||
MAX_LOOP_VALUES = 20
|
||||
MAX_CARRY_VALUES = MAX_LOOP_VALUES - 2
|
||||
MAX_SWITCH_INPUTS = 64
|
||||
MAX_SWITCH_INPUTS = index_switch_policy.MAX_SWITCH_INPUTS
|
||||
COLLECTION_MODES = ["auto_batch", "list", "image_batch", "latent_batch", "string_lines"]
|
||||
ACCUMULATOR_ACTIONS = ["append_variant", "replace_by_entry_id", "append", "clear_then_append", "clear", "read"]
|
||||
ACCUMULATOR_IMAGE_BATCH_MODES = ["same_size_only", "resize_to_first"]
|
||||
ACCUMULATOR_IMAGE_GROUPS = 4
|
||||
ACCUMULATOR_PREVIEW_VIEW_MODES = ["grid", "carousel"]
|
||||
ACCUMULATOR_PREVIEW_DELETE_ACTIONS = ["none", "delete_entry_id", "delete_index", "clear"]
|
||||
INDEX_SWITCH_MODES = ["pick_input", "route_output"]
|
||||
INDEX_SWITCH_BASES = ["one_based", "zero_based"]
|
||||
INDEX_SWITCH_MISSING_BEHAVIORS = ["fallback", "none", "clamp", "wrap"]
|
||||
INDEX_SWITCH_MODES = index_switch_policy.INDEX_SWITCH_MODES
|
||||
INDEX_SWITCH_BASES = index_switch_policy.INDEX_SWITCH_BASES
|
||||
INDEX_SWITCH_MISSING_BEHAVIORS = index_switch_policy.INDEX_SWITCH_MISSING_BEHAVIORS
|
||||
PREVIEW_TEXT_FORMATS = ["auto", "json", "repr", "str"]
|
||||
|
||||
_ACCUMULATOR_STORES: dict[str, list[dict[str, Any]]] = {}
|
||||
@@ -629,44 +634,6 @@ def append_collected_value(collection: Any, value: Any, mode: str = "auto_batch"
|
||||
return _as_list(collection) + [value]
|
||||
|
||||
|
||||
def _switch_available_indices(kwargs: dict[str, Any]) -> list[int]:
|
||||
indices = []
|
||||
for key in kwargs:
|
||||
match = re.match(r"^input_(\d+)$", str(key))
|
||||
if match:
|
||||
indices.append(int(match.group(1)))
|
||||
return sorted(set(indices))
|
||||
|
||||
|
||||
def _switch_requested_index(index: Any, index_base: str) -> int:
|
||||
requested = int(index)
|
||||
return requested + 1 if index_base == "zero_based" else requested
|
||||
|
||||
|
||||
def _switch_resolved_index(requested: int, available: list[int], missing_behavior: str) -> int | None:
|
||||
if requested in available:
|
||||
return requested
|
||||
if missing_behavior in ("fallback", "none") or not available:
|
||||
return None
|
||||
if missing_behavior == "wrap":
|
||||
return available[(requested - 1) % len(available)]
|
||||
if requested <= available[0]:
|
||||
return available[0]
|
||||
if requested >= available[-1]:
|
||||
return available[-1]
|
||||
lower = [value for value in available if value <= requested]
|
||||
return lower[-1] if lower else available[0]
|
||||
|
||||
|
||||
def _switch_status(requested: int, selected: int | None, used_fallback: bool, available: list[int]) -> str:
|
||||
available_text = ",".join(str(index) for index in available) or "none"
|
||||
if used_fallback:
|
||||
return f"requested=input_{requested}; selected=fallback; available={available_text}"
|
||||
if selected is None:
|
||||
return f"requested=input_{requested}; selected=none; available={available_text}"
|
||||
return f"requested=input_{requested}; selected=input_{selected}; available={available_text}"
|
||||
|
||||
|
||||
class SxCPWhileLoopStart:
|
||||
@classmethod
|
||||
def INPUT_TYPES(cls):
|
||||
@@ -923,50 +890,27 @@ class SxCPIndexSwitch:
|
||||
missing_behavior: str,
|
||||
kwargs: dict[str, Any],
|
||||
) -> tuple[int, int | None, list[int]]:
|
||||
index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based"
|
||||
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
||||
requested = _switch_requested_index(index, index_base)
|
||||
available = _switch_available_indices(kwargs)
|
||||
selected = _switch_resolved_index(requested, available, missing_behavior)
|
||||
return requested, selected, available
|
||||
return index_switch_policy.input_selection(index, index_base, missing_behavior, kwargs)
|
||||
|
||||
def _route_selection(self, index: Any, index_base: str, missing_behavior: str) -> tuple[int, int | None]:
|
||||
index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based"
|
||||
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
||||
requested = _switch_requested_index(index, index_base)
|
||||
if 1 <= requested <= MAX_SWITCH_INPUTS:
|
||||
return requested, requested
|
||||
if missing_behavior == "wrap":
|
||||
return requested, ((requested - 1) % MAX_SWITCH_INPUTS) + 1
|
||||
if missing_behavior == "clamp":
|
||||
return requested, min(max(requested, 1), MAX_SWITCH_INPUTS)
|
||||
return requested, None
|
||||
return index_switch_policy.route_selection(index, index_base, missing_behavior, MAX_SWITCH_INPUTS)
|
||||
|
||||
def _blocked_outputs(self) -> list[Any]:
|
||||
return [_execution_blocker() for _index in range(MAX_SWITCH_INPUTS)]
|
||||
|
||||
def check_lazy_status(self, index, mode, index_base, missing_behavior, **kwargs):
|
||||
mode = mode if mode in INDEX_SWITCH_MODES else "pick_input"
|
||||
if mode == "route_output":
|
||||
return ["route_value"] if "route_value" in kwargs else []
|
||||
requested, selected, _available = self._input_selection(index, index_base, missing_behavior, kwargs)
|
||||
selected_name = f"input_{selected}" if selected is not None else f"input_{requested}"
|
||||
if selected_name in kwargs:
|
||||
return [selected_name]
|
||||
if missing_behavior == "fallback" and "fallback" in kwargs:
|
||||
return ["fallback"]
|
||||
return []
|
||||
return index_switch_policy.lazy_inputs(index, mode, index_base, missing_behavior, kwargs)
|
||||
|
||||
def switch(self, index, mode, index_base, missing_behavior, **kwargs):
|
||||
mode = mode if mode in INDEX_SWITCH_MODES else "pick_input"
|
||||
missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback"
|
||||
mode = index_switch_policy.normalize_mode(mode)
|
||||
missing_behavior = index_switch_policy.normalize_missing_behavior(missing_behavior)
|
||||
if mode == "route_output":
|
||||
requested, selected = self._route_selection(index, index_base, missing_behavior)
|
||||
value = kwargs.get("route_value")
|
||||
outputs = self._blocked_outputs()
|
||||
if selected is not None and "route_value" in kwargs:
|
||||
outputs[selected - 1] = value
|
||||
status = f"mode=route_output; requested=output_{requested}; selected={'none' if selected is None else f'output_{selected}'}; range=1-{MAX_SWITCH_INPUTS}"
|
||||
status = f"mode=route_output; {index_switch_policy.route_status(requested, selected, MAX_SWITCH_INPUTS)}"
|
||||
selected_index = selected or 0
|
||||
return tuple([value if "route_value" in kwargs else None, selected_index, status] + outputs)
|
||||
|
||||
@@ -975,12 +919,12 @@ class SxCPIndexSwitch:
|
||||
selected_name = f"input_{selected}"
|
||||
if selected_name in kwargs:
|
||||
value = kwargs.get(selected_name)
|
||||
status = f"mode=pick_input; {_switch_status(requested, selected, False, available)}"
|
||||
status = f"mode=pick_input; {index_switch_policy.input_status(requested, selected, False, available)}"
|
||||
return tuple([value, selected, status] + self._blocked_outputs())
|
||||
if missing_behavior == "fallback" and "fallback" in kwargs:
|
||||
status = f"mode=pick_input; {_switch_status(requested, None, True, available)}"
|
||||
status = f"mode=pick_input; {index_switch_policy.input_status(requested, None, True, available)}"
|
||||
return tuple([kwargs.get("fallback"), 0, status] + self._blocked_outputs())
|
||||
status = f"mode=pick_input; {_switch_status(requested, None, False, available)}"
|
||||
status = f"mode=pick_input; {index_switch_policy.input_status(requested, None, False, available)}"
|
||||
return tuple([None, 0, status] + self._blocked_outputs())
|
||||
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ import formatter_input # noqa: E402
|
||||
import hardcore_position_config # noqa: E402
|
||||
import __init__ as sxcp_nodes # noqa: E402
|
||||
import generation_profile_config # noqa: E402
|
||||
import index_switch_policy # noqa: E402
|
||||
import krea_cast # noqa: E402
|
||||
import krea_formatter # noqa: E402
|
||||
import location_config # noqa: E402
|
||||
@@ -2647,6 +2648,22 @@ def smoke_node_utility_registration() -> None:
|
||||
|
||||
|
||||
def smoke_server_route_payload_policy() -> None:
|
||||
requested, selected, available = index_switch_policy.input_selection(
|
||||
0,
|
||||
"zero_based",
|
||||
"fallback",
|
||||
{"input_1": "first"},
|
||||
)
|
||||
_expect((requested, selected, available) == (1, 1, [1]), "Index switch policy zero-based selection changed")
|
||||
_expect(
|
||||
index_switch_policy.route_selection(65, "one_based", "wrap") == (65, 1),
|
||||
"Index switch policy wrap routing changed",
|
||||
)
|
||||
_expect(
|
||||
index_switch_policy.lazy_inputs(2, "pick_input", "one_based", "fallback", {"input_2": "second"}) == ["input_2"],
|
||||
"Index switch policy lazy input selection changed",
|
||||
)
|
||||
|
||||
switch = loop_nodes.SxCPIndexSwitch()
|
||||
picked = switch.switch(
|
||||
2,
|
||||
|
||||
Reference in New Issue
Block a user