feat: gate_bus blocking choice waiter
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,35 @@
|
|||||||
|
"""Blocking choice bus for the Image Gate node. Stdlib only — no comfy/torch."""
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class GateCancelled(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class GateBus:
|
||||||
|
messages = {} # node_id(str) -> chosen int (1-based)
|
||||||
|
masks = {} # node_id(str) -> PNG bytes
|
||||||
|
cancelled = False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def arm(cls, node_id):
|
||||||
|
cls.messages.pop(str(node_id), None)
|
||||||
|
cls.masks.pop(str(node_id), None)
|
||||||
|
cls.cancelled = False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def put(cls, node_id, message):
|
||||||
|
if message == "__cancel__":
|
||||||
|
cls.cancelled = True
|
||||||
|
else:
|
||||||
|
cls.messages[str(node_id)] = int(message)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def wait(cls, node_id, period=0.1):
|
||||||
|
sid = str(node_id)
|
||||||
|
while sid not in cls.messages:
|
||||||
|
if cls.cancelled:
|
||||||
|
cls.cancelled = False
|
||||||
|
raise GateCancelled()
|
||||||
|
time.sleep(period)
|
||||||
|
return cls.messages.pop(sid)
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
# tests/test_gate_bus.py
|
||||||
|
import pytest
|
||||||
|
from gates import gate_bus as gb
|
||||||
|
|
||||||
|
def test_put_and_wait_returns_choice():
|
||||||
|
gb.GateBus.arm("7")
|
||||||
|
gb.GateBus.put("7", "3")
|
||||||
|
assert gb.GateBus.wait("7") == 3
|
||||||
|
|
||||||
|
def test_wait_consumes_message():
|
||||||
|
gb.GateBus.arm("7")
|
||||||
|
gb.GateBus.put("7", "2")
|
||||||
|
gb.GateBus.wait("7")
|
||||||
|
assert "7" not in gb.GateBus.messages
|
||||||
|
|
||||||
|
def test_cancel_raises_and_resets():
|
||||||
|
gb.GateBus.arm("7")
|
||||||
|
gb.GateBus.put("7", "__cancel__")
|
||||||
|
with pytest.raises(gb.GateCancelled):
|
||||||
|
gb.GateBus.wait("7")
|
||||||
|
assert gb.GateBus.cancelled is False # reset after raising
|
||||||
|
|
||||||
|
def test_arm_clears_stale_state():
|
||||||
|
gb.GateBus.put("1", "5")
|
||||||
|
gb.GateBus.cancelled = True
|
||||||
|
gb.GateBus.arm("1")
|
||||||
|
assert "1" not in gb.GateBus.messages
|
||||||
|
assert gb.GateBus.cancelled is False
|
||||||
Reference in New Issue
Block a user