diff --git a/docs/prompt-architecture-improvement-plan.md b/docs/prompt-architecture-improvement-plan.md index c00c471..706395e 100644 --- a/docs/prompt-architecture-improvement-plan.md +++ b/docs/prompt-architecture-improvement-plan.md @@ -416,6 +416,10 @@ Already isolated: by `__init__.py`. - generation profile, advanced filter, and ethnicity list utility nodes live in `node_profile_filter.py`, with registration maps imported by `__init__.py`. +- index-switch constants, index-base normalization, missing-input behavior, + route-output selection, status text, and lazy-input selection live in + `index_switch_policy.py`; `loop_nodes.py` keeps the ComfyUI node wrapper and + accumulator/loop runtime logic. - profile-save and accumulator server payload handling lives in `server_routes.py`; `__init__.py` only wires those pure handlers to ComfyUI JSON responses, and `tools/prompt_smoke.py` covers the handlers without diff --git a/docs/prompt-pool-routing-map.md b/docs/prompt-pool-routing-map.md index e03c927..542974b 100644 --- a/docs/prompt-pool-routing-map.md +++ b/docs/prompt-pool-routing-map.md @@ -722,7 +722,7 @@ These do not own prompt pool wording, but they affect execution and review: | Node family | Files | Purpose | | --- | --- | --- | | Loop nodes | `loop_nodes.py`, `web/loop_slots.js` | While/for loop execution and carry values. | -| Index switch | `loop_nodes.py`, `web/index_switch_slots.js` | Multi-input to selected output, and selected input to multi-output routing. | +| Index switch | `loop_nodes.py`, `index_switch_policy.py`, `web/index_switch_slots.js` | Multi-input to selected output, and selected input to multi-output routing. Pure index-base, missing-input, route-output, status, and lazy-input policy lives in `index_switch_policy.py`. | | Accumulator | `loop_nodes.py`, `web/accumulator_preview.js` | Stores generated values/images during workflow execution and previews/reorders/deletes them. | | Persistent text preview | `loop_nodes.py`, `web/preview_any_text.js` | Stores any value as text and keeps it after workflow reload. | | Builder node wrappers | `node_builder.py`, imported by `__init__.py` | Direct prompt builder and config-driven prompt builder ComfyUI declarations. | diff --git a/index_switch_policy.py b/index_switch_policy.py new file mode 100644 index 0000000..51263b9 --- /dev/null +++ b/index_switch_policy.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import re +from typing import Any + + +MAX_SWITCH_INPUTS = 64 +INDEX_SWITCH_MODES = ["pick_input", "route_output"] +INDEX_SWITCH_BASES = ["one_based", "zero_based"] +INDEX_SWITCH_MISSING_BEHAVIORS = ["fallback", "none", "clamp", "wrap"] + + +def normalize_index_base(value: Any) -> str: + return value if value in INDEX_SWITCH_BASES else "one_based" + + +def normalize_missing_behavior(value: Any) -> str: + return value if value in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback" + + +def normalize_mode(value: Any) -> str: + return value if value in INDEX_SWITCH_MODES else "pick_input" + + +def available_input_indices(kwargs: dict[str, Any]) -> list[int]: + indices = [] + for key in kwargs: + match = re.match(r"^input_(\d+)$", str(key)) + if match: + indices.append(int(match.group(1))) + return sorted(set(indices)) + + +def requested_index(index: Any, index_base: str) -> int: + requested = int(index) + return requested + 1 if normalize_index_base(index_base) == "zero_based" else requested + + +def resolved_input_index(requested: int, available: list[int], missing_behavior: str) -> int | None: + missing_behavior = normalize_missing_behavior(missing_behavior) + if requested in available: + return requested + if missing_behavior in ("fallback", "none") or not available: + return None + if missing_behavior == "wrap": + return available[(requested - 1) % len(available)] + if requested <= available[0]: + return available[0] + if requested >= available[-1]: + return available[-1] + lower = [value for value in available if value <= requested] + return lower[-1] if lower else available[0] + + +def input_selection(index: Any, index_base: str, missing_behavior: str, kwargs: dict[str, Any]) -> tuple[int, int | None, list[int]]: + requested = requested_index(index, index_base) + available = available_input_indices(kwargs) + selected = resolved_input_index(requested, available, missing_behavior) + return requested, selected, available + + +def route_selection(index: Any, index_base: str, missing_behavior: str, max_outputs: int = MAX_SWITCH_INPUTS) -> tuple[int, int | None]: + requested = requested_index(index, index_base) + max_outputs = max(1, int(max_outputs)) + missing_behavior = normalize_missing_behavior(missing_behavior) + if 1 <= requested <= max_outputs: + return requested, requested + if missing_behavior == "wrap": + return requested, ((requested - 1) % max_outputs) + 1 + if missing_behavior == "clamp": + return requested, min(max(requested, 1), max_outputs) + return requested, None + + +def input_status(requested: int, selected: int | None, used_fallback: bool, available: list[int]) -> str: + available_text = ",".join(str(index) for index in available) or "none" + if used_fallback: + return f"requested=input_{requested}; selected=fallback; available={available_text}" + if selected is None: + return f"requested=input_{requested}; selected=none; available={available_text}" + return f"requested=input_{requested}; selected=input_{selected}; available={available_text}" + + +def route_status(requested: int, selected: int | None, max_outputs: int = MAX_SWITCH_INPUTS) -> str: + selected_text = "none" if selected is None else f"output_{selected}" + return f"requested=output_{requested}; selected={selected_text}; range=1-{max_outputs}" + + +def lazy_inputs(index: Any, mode: str, index_base: str, missing_behavior: str, kwargs: dict[str, Any]) -> list[str]: + mode = normalize_mode(mode) + missing_behavior = normalize_missing_behavior(missing_behavior) + if mode == "route_output": + return ["route_value"] if "route_value" in kwargs else [] + requested, selected, _available = input_selection(index, index_base, missing_behavior, kwargs) + selected_name = f"input_{selected}" if selected is not None else f"input_{requested}" + if selected_name in kwargs: + return [selected_name] + if missing_behavior == "fallback" and "fallback" in kwargs: + return ["fallback"] + return [] diff --git a/loop_nodes.py b/loop_nodes.py index 4328b1e..4edaf69 100644 --- a/loop_nodes.py +++ b/loop_nodes.py @@ -6,6 +6,11 @@ import random import re from typing import Any +try: + from . import index_switch_policy +except Exception: # Allows local imports outside ComfyUI package mode. + import index_switch_policy + try: from comfy_execution.graph import ExecutionBlocker from comfy_execution.graph_utils import GraphBuilder, is_link @@ -41,16 +46,16 @@ except Exception: MAX_LOOP_VALUES = 20 MAX_CARRY_VALUES = MAX_LOOP_VALUES - 2 -MAX_SWITCH_INPUTS = 64 +MAX_SWITCH_INPUTS = index_switch_policy.MAX_SWITCH_INPUTS COLLECTION_MODES = ["auto_batch", "list", "image_batch", "latent_batch", "string_lines"] ACCUMULATOR_ACTIONS = ["append_variant", "replace_by_entry_id", "append", "clear_then_append", "clear", "read"] ACCUMULATOR_IMAGE_BATCH_MODES = ["same_size_only", "resize_to_first"] ACCUMULATOR_IMAGE_GROUPS = 4 ACCUMULATOR_PREVIEW_VIEW_MODES = ["grid", "carousel"] ACCUMULATOR_PREVIEW_DELETE_ACTIONS = ["none", "delete_entry_id", "delete_index", "clear"] -INDEX_SWITCH_MODES = ["pick_input", "route_output"] -INDEX_SWITCH_BASES = ["one_based", "zero_based"] -INDEX_SWITCH_MISSING_BEHAVIORS = ["fallback", "none", "clamp", "wrap"] +INDEX_SWITCH_MODES = index_switch_policy.INDEX_SWITCH_MODES +INDEX_SWITCH_BASES = index_switch_policy.INDEX_SWITCH_BASES +INDEX_SWITCH_MISSING_BEHAVIORS = index_switch_policy.INDEX_SWITCH_MISSING_BEHAVIORS PREVIEW_TEXT_FORMATS = ["auto", "json", "repr", "str"] _ACCUMULATOR_STORES: dict[str, list[dict[str, Any]]] = {} @@ -629,44 +634,6 @@ def append_collected_value(collection: Any, value: Any, mode: str = "auto_batch" return _as_list(collection) + [value] -def _switch_available_indices(kwargs: dict[str, Any]) -> list[int]: - indices = [] - for key in kwargs: - match = re.match(r"^input_(\d+)$", str(key)) - if match: - indices.append(int(match.group(1))) - return sorted(set(indices)) - - -def _switch_requested_index(index: Any, index_base: str) -> int: - requested = int(index) - return requested + 1 if index_base == "zero_based" else requested - - -def _switch_resolved_index(requested: int, available: list[int], missing_behavior: str) -> int | None: - if requested in available: - return requested - if missing_behavior in ("fallback", "none") or not available: - return None - if missing_behavior == "wrap": - return available[(requested - 1) % len(available)] - if requested <= available[0]: - return available[0] - if requested >= available[-1]: - return available[-1] - lower = [value for value in available if value <= requested] - return lower[-1] if lower else available[0] - - -def _switch_status(requested: int, selected: int | None, used_fallback: bool, available: list[int]) -> str: - available_text = ",".join(str(index) for index in available) or "none" - if used_fallback: - return f"requested=input_{requested}; selected=fallback; available={available_text}" - if selected is None: - return f"requested=input_{requested}; selected=none; available={available_text}" - return f"requested=input_{requested}; selected=input_{selected}; available={available_text}" - - class SxCPWhileLoopStart: @classmethod def INPUT_TYPES(cls): @@ -923,50 +890,27 @@ class SxCPIndexSwitch: missing_behavior: str, kwargs: dict[str, Any], ) -> tuple[int, int | None, list[int]]: - index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based" - missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback" - requested = _switch_requested_index(index, index_base) - available = _switch_available_indices(kwargs) - selected = _switch_resolved_index(requested, available, missing_behavior) - return requested, selected, available + return index_switch_policy.input_selection(index, index_base, missing_behavior, kwargs) def _route_selection(self, index: Any, index_base: str, missing_behavior: str) -> tuple[int, int | None]: - index_base = index_base if index_base in INDEX_SWITCH_BASES else "one_based" - missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback" - requested = _switch_requested_index(index, index_base) - if 1 <= requested <= MAX_SWITCH_INPUTS: - return requested, requested - if missing_behavior == "wrap": - return requested, ((requested - 1) % MAX_SWITCH_INPUTS) + 1 - if missing_behavior == "clamp": - return requested, min(max(requested, 1), MAX_SWITCH_INPUTS) - return requested, None + return index_switch_policy.route_selection(index, index_base, missing_behavior, MAX_SWITCH_INPUTS) def _blocked_outputs(self) -> list[Any]: return [_execution_blocker() for _index in range(MAX_SWITCH_INPUTS)] def check_lazy_status(self, index, mode, index_base, missing_behavior, **kwargs): - mode = mode if mode in INDEX_SWITCH_MODES else "pick_input" - if mode == "route_output": - return ["route_value"] if "route_value" in kwargs else [] - requested, selected, _available = self._input_selection(index, index_base, missing_behavior, kwargs) - selected_name = f"input_{selected}" if selected is not None else f"input_{requested}" - if selected_name in kwargs: - return [selected_name] - if missing_behavior == "fallback" and "fallback" in kwargs: - return ["fallback"] - return [] + return index_switch_policy.lazy_inputs(index, mode, index_base, missing_behavior, kwargs) def switch(self, index, mode, index_base, missing_behavior, **kwargs): - mode = mode if mode in INDEX_SWITCH_MODES else "pick_input" - missing_behavior = missing_behavior if missing_behavior in INDEX_SWITCH_MISSING_BEHAVIORS else "fallback" + mode = index_switch_policy.normalize_mode(mode) + missing_behavior = index_switch_policy.normalize_missing_behavior(missing_behavior) if mode == "route_output": requested, selected = self._route_selection(index, index_base, missing_behavior) value = kwargs.get("route_value") outputs = self._blocked_outputs() if selected is not None and "route_value" in kwargs: outputs[selected - 1] = value - status = f"mode=route_output; requested=output_{requested}; selected={'none' if selected is None else f'output_{selected}'}; range=1-{MAX_SWITCH_INPUTS}" + status = f"mode=route_output; {index_switch_policy.route_status(requested, selected, MAX_SWITCH_INPUTS)}" selected_index = selected or 0 return tuple([value if "route_value" in kwargs else None, selected_index, status] + outputs) @@ -975,12 +919,12 @@ class SxCPIndexSwitch: selected_name = f"input_{selected}" if selected_name in kwargs: value = kwargs.get(selected_name) - status = f"mode=pick_input; {_switch_status(requested, selected, False, available)}" + status = f"mode=pick_input; {index_switch_policy.input_status(requested, selected, False, available)}" return tuple([value, selected, status] + self._blocked_outputs()) if missing_behavior == "fallback" and "fallback" in kwargs: - status = f"mode=pick_input; {_switch_status(requested, None, True, available)}" + status = f"mode=pick_input; {index_switch_policy.input_status(requested, None, True, available)}" return tuple([kwargs.get("fallback"), 0, status] + self._blocked_outputs()) - status = f"mode=pick_input; {_switch_status(requested, None, False, available)}" + status = f"mode=pick_input; {index_switch_policy.input_status(requested, None, False, available)}" return tuple([None, 0, status] + self._blocked_outputs()) diff --git a/tools/prompt_smoke.py b/tools/prompt_smoke.py index 762e29b..0eafd0a 100644 --- a/tools/prompt_smoke.py +++ b/tools/prompt_smoke.py @@ -36,6 +36,7 @@ import formatter_input # noqa: E402 import hardcore_position_config # noqa: E402 import __init__ as sxcp_nodes # noqa: E402 import generation_profile_config # noqa: E402 +import index_switch_policy # noqa: E402 import krea_cast # noqa: E402 import krea_formatter # noqa: E402 import location_config # noqa: E402 @@ -2647,6 +2648,22 @@ def smoke_node_utility_registration() -> None: def smoke_server_route_payload_policy() -> None: + requested, selected, available = index_switch_policy.input_selection( + 0, + "zero_based", + "fallback", + {"input_1": "first"}, + ) + _expect((requested, selected, available) == (1, 1, [1]), "Index switch policy zero-based selection changed") + _expect( + index_switch_policy.route_selection(65, "one_based", "wrap") == (65, 1), + "Index switch policy wrap routing changed", + ) + _expect( + index_switch_policy.lazy_inputs(2, "pick_input", "one_based", "fallback", {"input_2": "second"}) == ["input_2"], + "Index switch policy lazy input selection changed", + ) + switch = loop_nodes.SxCPIndexSwitch() picked = switch.switch( 2,