Migrate web UI from Streamlit to NiceGUI
Replace the Streamlit-based UI (app.py + tab_*.py) with an event-driven NiceGUI implementation. This eliminates 135 session_state accesses, 35 st.rerun() calls, and the ui_reset_token hack. Key changes: - Add main.py as NiceGUI entry point with sidebar, tabs, and file navigation - Add state.py with AppState dataclass replacing st.session_state - Add tab_batch_ng.py (batch processor with blur-binding, VACE calc) - Add tab_timeline_ng.py (history tree with graphviz, batch delete) - Add tab_raw_ng.py (raw JSON editor) - Add tab_comfy_ng.py (ComfyUI monitor with polling timer) - Remove Streamlit dependency from utils.py (st.error → logger.error) - Remove Streamlit mock from tests/test_utils.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
272
main.py
Normal file
272
main.py
Normal file
@@ -0,0 +1,272 @@
|
||||
from pathlib import Path
|
||||
|
||||
from nicegui import ui
|
||||
|
||||
from state import AppState
|
||||
from utils import (
|
||||
load_config, save_config, load_snippets, save_snippets,
|
||||
load_json, save_json, generate_templates, DEFAULTS,
|
||||
KEY_BATCH_DATA, KEY_SEQUENCE_NUMBER,
|
||||
resolve_path_case_insensitive,
|
||||
)
|
||||
from tab_batch_ng import render_batch_processor
|
||||
from tab_timeline_ng import render_timeline_tab
|
||||
from tab_raw_ng import render_raw_editor
|
||||
from tab_comfy_ng import render_comfy_monitor
|
||||
|
||||
|
||||
@ui.page('/')
|
||||
def index():
|
||||
config = load_config()
|
||||
state = AppState(
|
||||
config=config,
|
||||
current_dir=Path(config.get('last_dir', str(Path.cwd()))),
|
||||
snippets=load_snippets(),
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Define helpers FIRST (before sidebar, which needs them)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@ui.refreshable
|
||||
def render_main_content():
|
||||
if not state.file_path or not state.file_path.exists():
|
||||
ui.label('Select a file from the sidebar to begin.').classes(
|
||||
'text-subtitle1 q-pa-lg')
|
||||
return
|
||||
|
||||
ui.label(f'Editing: {state.file_path.name}').classes('text-h5 q-mb-md')
|
||||
|
||||
with ui.tabs().classes('w-full') as tabs:
|
||||
ui.tab('batch', label='Batch Processor')
|
||||
ui.tab('timeline', label='Timeline')
|
||||
ui.tab('raw', label='Raw Editor')
|
||||
|
||||
with ui.tab_panels(tabs, value='batch').classes('w-full'):
|
||||
with ui.tab_panel('batch'):
|
||||
render_batch_processor(state)
|
||||
with ui.tab_panel('timeline'):
|
||||
render_timeline_tab(state)
|
||||
with ui.tab_panel('raw'):
|
||||
render_raw_editor(state)
|
||||
|
||||
if state.show_comfy_monitor:
|
||||
ui.separator()
|
||||
with ui.expansion('ComfyUI Monitor', icon='dns').classes('w-full'):
|
||||
render_comfy_monitor(state)
|
||||
|
||||
def load_file(file_name: str):
|
||||
"""Load a JSON file and refresh the main content."""
|
||||
fp = state.current_dir / file_name
|
||||
if state.loaded_file == str(fp):
|
||||
return
|
||||
data, mtime = load_json(fp)
|
||||
state.data_cache = data
|
||||
state.last_mtime = mtime
|
||||
state.loaded_file = str(fp)
|
||||
state.file_path = fp
|
||||
state.restored_indicator = None
|
||||
if state._main_rendered:
|
||||
render_main_content.refresh()
|
||||
|
||||
# Attach helpers to state so sidebar can call them
|
||||
state._load_file = load_file
|
||||
state._render_main = render_main_content
|
||||
state._main_rendered = False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Sidebar (rendered AFTER helpers are attached)
|
||||
# ------------------------------------------------------------------
|
||||
with ui.left_drawer().classes('q-pa-md').style('width: 350px'):
|
||||
render_sidebar(state)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Main content area
|
||||
# ------------------------------------------------------------------
|
||||
render_main_content()
|
||||
state._main_rendered = True
|
||||
|
||||
|
||||
# ======================================================================
|
||||
# Sidebar
|
||||
# ======================================================================
|
||||
|
||||
def render_sidebar(state: AppState):
|
||||
ui.label('Navigator').classes('text-h6')
|
||||
|
||||
# --- Path input ---
|
||||
path_input = ui.input(
|
||||
'Current Path',
|
||||
value=str(state.current_dir),
|
||||
).classes('w-full')
|
||||
|
||||
def on_path_enter():
|
||||
p = resolve_path_case_insensitive(path_input.value)
|
||||
if p is not None and p.is_dir():
|
||||
state.current_dir = p
|
||||
state.config['last_dir'] = str(p)
|
||||
save_config(state.current_dir, state.config['favorites'])
|
||||
state.loaded_file = None
|
||||
state.file_path = None
|
||||
path_input.set_value(str(p))
|
||||
render_file_list.refresh()
|
||||
# Auto-load inside render_file_list already refreshed main content
|
||||
# if files exist; only refresh here for the empty-directory case.
|
||||
if not state.loaded_file:
|
||||
state._render_main.refresh()
|
||||
|
||||
path_input.on('keydown.enter', lambda _: on_path_enter())
|
||||
|
||||
# --- Pin / Unpin ---
|
||||
def pin_folder():
|
||||
d = str(state.current_dir)
|
||||
if d not in state.config['favorites']:
|
||||
state.config['favorites'].append(d)
|
||||
save_config(state.current_dir, state.config['favorites'])
|
||||
render_favorites.refresh()
|
||||
|
||||
ui.button('Pin Folder', icon='push_pin', on_click=pin_folder).classes('w-full')
|
||||
|
||||
@ui.refreshable
|
||||
def render_favorites():
|
||||
for fav in list(state.config['favorites']):
|
||||
with ui.row().classes('w-full items-center'):
|
||||
ui.button(
|
||||
fav,
|
||||
on_click=lambda f=fav: _jump_to(f),
|
||||
).props('flat dense').classes('col')
|
||||
ui.button(
|
||||
icon='close',
|
||||
on_click=lambda f=fav: _unpin(f),
|
||||
).props('flat dense color=negative')
|
||||
|
||||
def _jump_to(fav: str):
|
||||
state.current_dir = Path(fav)
|
||||
state.config['last_dir'] = fav
|
||||
save_config(state.current_dir, state.config['favorites'])
|
||||
state.loaded_file = None
|
||||
state.file_path = None
|
||||
path_input.set_value(fav)
|
||||
render_file_list.refresh()
|
||||
if not state.loaded_file:
|
||||
state._render_main.refresh()
|
||||
|
||||
def _unpin(fav: str):
|
||||
if fav in state.config['favorites']:
|
||||
state.config['favorites'].remove(fav)
|
||||
save_config(state.current_dir, state.config['favorites'])
|
||||
render_favorites.refresh()
|
||||
|
||||
render_favorites()
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- Snippet Library ---
|
||||
ui.label('Snippet Library').classes('text-subtitle1 q-mt-md')
|
||||
|
||||
with ui.expansion('Add New Snippet'):
|
||||
snip_name_input = ui.input('Name', placeholder='e.g. Cinematic').classes('w-full')
|
||||
snip_content_input = ui.textarea('Content', placeholder='4k, high quality...').classes('w-full')
|
||||
|
||||
def save_snippet():
|
||||
name = snip_name_input.value
|
||||
content = snip_content_input.value
|
||||
if name and content:
|
||||
state.snippets[name] = content
|
||||
save_snippets(state.snippets)
|
||||
snip_name_input.set_value('')
|
||||
snip_content_input.set_value('')
|
||||
ui.notify(f"Saved '{name}'")
|
||||
render_snippet_list.refresh()
|
||||
|
||||
ui.button('Save Snippet', on_click=save_snippet).classes('w-full')
|
||||
|
||||
@ui.refreshable
|
||||
def render_snippet_list():
|
||||
if not state.snippets:
|
||||
return
|
||||
ui.label('Click to copy snippet text:').classes('text-caption')
|
||||
for name, content in list(state.snippets.items()):
|
||||
with ui.row().classes('w-full items-center'):
|
||||
async def copy_snippet(c=content):
|
||||
await ui.run_javascript(
|
||||
f'navigator.clipboard.writeText({c!r})', timeout=3.0)
|
||||
ui.notify('Copied to clipboard')
|
||||
|
||||
ui.button(
|
||||
f'{name}',
|
||||
on_click=copy_snippet,
|
||||
).props('flat dense').classes('col')
|
||||
ui.button(
|
||||
icon='delete',
|
||||
on_click=lambda n=name: _del_snippet(n),
|
||||
).props('flat dense color=negative')
|
||||
|
||||
def _del_snippet(name: str):
|
||||
if name in state.snippets:
|
||||
del state.snippets[name]
|
||||
save_snippets(state.snippets)
|
||||
render_snippet_list.refresh()
|
||||
|
||||
render_snippet_list()
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- File List ---
|
||||
@ui.refreshable
|
||||
def render_file_list():
|
||||
json_files = sorted(state.current_dir.glob('*.json'))
|
||||
json_files = [f for f in json_files if f.name not in ('.editor_config.json', '.editor_snippets.json')]
|
||||
|
||||
if not json_files:
|
||||
ui.label('No JSON files in this folder.').classes('text-caption')
|
||||
ui.button('Generate Templates', on_click=lambda: _gen_templates()).classes('w-full')
|
||||
return
|
||||
|
||||
with ui.expansion('Create New JSON'):
|
||||
new_fn_input = ui.input('Filename', placeholder='my_prompt_vace').classes('w-full')
|
||||
|
||||
def create_new():
|
||||
fn = new_fn_input.value
|
||||
if not fn:
|
||||
return
|
||||
if not fn.endswith('.json'):
|
||||
fn += '.json'
|
||||
path = state.current_dir / fn
|
||||
first_item = DEFAULTS.copy()
|
||||
first_item[KEY_SEQUENCE_NUMBER] = 1
|
||||
save_json(path, {KEY_BATCH_DATA: [first_item]})
|
||||
new_fn_input.set_value('')
|
||||
render_file_list.refresh()
|
||||
|
||||
ui.button('Create', on_click=create_new).classes('w-full')
|
||||
|
||||
ui.label('Select File').classes('text-subtitle2 q-mt-sm')
|
||||
file_names = [f.name for f in json_files]
|
||||
ui.radio(
|
||||
file_names,
|
||||
value=file_names[0] if file_names else None,
|
||||
on_change=lambda e: state._load_file(e.value) if e.value else None,
|
||||
).classes('w-full')
|
||||
|
||||
# Auto-load first file if nothing loaded yet
|
||||
if file_names and not state.loaded_file:
|
||||
state._load_file(file_names[0])
|
||||
|
||||
def _gen_templates():
|
||||
generate_templates(state.current_dir)
|
||||
render_file_list.refresh()
|
||||
|
||||
render_file_list()
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- Comfy Monitor toggle ---
|
||||
def on_monitor_toggle(e):
|
||||
state.show_comfy_monitor = e.value
|
||||
state._render_main.refresh()
|
||||
|
||||
ui.checkbox('Show Comfy Monitor', value=True, on_change=on_monitor_toggle)
|
||||
|
||||
|
||||
ui.run(title='AI Settings Manager', port=8080, reload=True)
|
||||
17
state.py
Normal file
17
state.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppState:
|
||||
config: dict
|
||||
current_dir: Path
|
||||
loaded_file: str | None = None
|
||||
last_mtime: float = 0
|
||||
data_cache: dict = field(default_factory=dict)
|
||||
snippets: dict = field(default_factory=dict)
|
||||
file_path: Path | None = None
|
||||
restored_indicator: str | None = None
|
||||
timeline_selected_nodes: set = field(default_factory=set)
|
||||
live_toggles: dict = field(default_factory=dict)
|
||||
show_comfy_monitor: bool = True
|
||||
700
tab_batch_ng.py
Normal file
700
tab_batch_ng.py
Normal file
@@ -0,0 +1,700 @@
|
||||
import copy
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
from nicegui import ui
|
||||
|
||||
from state import AppState
|
||||
from utils import (
|
||||
DEFAULTS, save_json, load_json,
|
||||
KEY_BATCH_DATA, KEY_HISTORY_TREE, KEY_PROMPT_HISTORY, KEY_SEQUENCE_NUMBER,
|
||||
)
|
||||
from history_tree import HistoryTree
|
||||
|
||||
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.webp', '.bmp', '.gif'}
|
||||
SUB_SEGMENT_MULTIPLIER = 1000
|
||||
|
||||
VACE_MODES = [
|
||||
'End Extend', 'Pre Extend', 'Middle Extend', 'Edge Extend',
|
||||
'Join Extend', 'Bidirectional Extend', 'Frame Interpolation',
|
||||
'Replace/Inpaint', 'Video Inpaint', 'Keyframe',
|
||||
]
|
||||
VACE_FORMULAS = [
|
||||
'base + A', 'base + B', 'base + A + B', 'base + A + B',
|
||||
'base + A + B', 'base + A + B', '(B-1) * step',
|
||||
'snap(source)', 'snap(source)', 'base + A + B',
|
||||
]
|
||||
|
||||
|
||||
# --- Sub-segment helpers (same as original) ---
|
||||
|
||||
def is_subsegment(seq_num):
|
||||
return int(seq_num) >= SUB_SEGMENT_MULTIPLIER
|
||||
|
||||
def parent_of(seq_num):
|
||||
seq_num = int(seq_num)
|
||||
return seq_num // SUB_SEGMENT_MULTIPLIER if is_subsegment(seq_num) else seq_num
|
||||
|
||||
def sub_index_of(seq_num):
|
||||
seq_num = int(seq_num)
|
||||
return seq_num % SUB_SEGMENT_MULTIPLIER if is_subsegment(seq_num) else 0
|
||||
|
||||
def format_seq_label(seq_num):
|
||||
seq_num = int(seq_num)
|
||||
if is_subsegment(seq_num):
|
||||
return f'Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)}'
|
||||
return f'Sequence #{seq_num}'
|
||||
|
||||
def next_sub_segment_number(batch_list, parent_seq_num):
|
||||
parent_seq_num = int(parent_seq_num)
|
||||
max_sub = 0
|
||||
for s in batch_list:
|
||||
sn = int(s.get(KEY_SEQUENCE_NUMBER, 0))
|
||||
if is_subsegment(sn) and parent_of(sn) == parent_seq_num:
|
||||
max_sub = max(max_sub, sub_index_of(sn))
|
||||
return parent_seq_num * SUB_SEGMENT_MULTIPLIER + max_sub + 1
|
||||
|
||||
def find_insert_position(batch_list, parent_index, parent_seq_num):
|
||||
parent_seq_num = int(parent_seq_num)
|
||||
pos = parent_index + 1
|
||||
while pos < len(batch_list):
|
||||
sn = int(batch_list[pos].get(KEY_SEQUENCE_NUMBER, 0))
|
||||
if is_subsegment(sn) and parent_of(sn) == parent_seq_num:
|
||||
pos += 1
|
||||
else:
|
||||
break
|
||||
return pos
|
||||
|
||||
|
||||
# --- Helper for repetitive dict-bound inputs ---
|
||||
|
||||
def dict_input(element_fn, label, seq, key, **kwargs):
|
||||
"""Create an input element bound to seq[key] via blur event."""
|
||||
val = seq.get(key, '')
|
||||
if isinstance(val, (int, float)):
|
||||
val = str(val) if element_fn != ui.number else val
|
||||
el = element_fn(label, value=val, **kwargs)
|
||||
el.on('blur', lambda e, k=key: seq.__setitem__(k, e.sender.value))
|
||||
return el
|
||||
|
||||
|
||||
def dict_number(label, seq, key, **kwargs):
|
||||
"""Number input bound to seq[key] via blur."""
|
||||
val = seq.get(key, 0)
|
||||
try:
|
||||
# Try float first to handle "1.5" strings, then check if it's a clean int
|
||||
fval = float(val)
|
||||
val = int(fval) if fval == int(fval) else fval
|
||||
except (ValueError, TypeError):
|
||||
val = 0
|
||||
el = ui.number(label, value=val, **kwargs)
|
||||
el.on('blur', lambda e, k=key: seq.__setitem__(
|
||||
k, e.sender.value if e.sender.value is not None else 0))
|
||||
return el
|
||||
|
||||
|
||||
def dict_textarea(label, seq, key, **kwargs):
|
||||
"""Textarea bound to seq[key] via blur."""
|
||||
el = ui.textarea(label, value=seq.get(key, ''), **kwargs)
|
||||
el.on('blur', lambda e, k=key: seq.__setitem__(k, e.sender.value))
|
||||
return el
|
||||
|
||||
|
||||
# ======================================================================
|
||||
# Main render function
|
||||
# ======================================================================
|
||||
|
||||
def render_batch_processor(state: AppState):
|
||||
data = state.data_cache
|
||||
file_path = state.file_path
|
||||
is_batch_file = KEY_BATCH_DATA in data or isinstance(data, list)
|
||||
|
||||
if not is_batch_file:
|
||||
ui.label('This is a Single file. To use Batch mode, create a copy.').classes(
|
||||
'text-warning')
|
||||
|
||||
def create_batch():
|
||||
new_name = f'batch_{file_path.name}'
|
||||
new_path = file_path.parent / new_name
|
||||
if new_path.exists():
|
||||
ui.notify(f'File {new_name} already exists!', type='warning')
|
||||
return
|
||||
first_item = data.copy()
|
||||
first_item.pop(KEY_PROMPT_HISTORY, None)
|
||||
first_item.pop(KEY_HISTORY_TREE, None)
|
||||
first_item[KEY_SEQUENCE_NUMBER] = 1
|
||||
new_data = {KEY_BATCH_DATA: [first_item], KEY_HISTORY_TREE: {},
|
||||
KEY_PROMPT_HISTORY: []}
|
||||
save_json(new_path, new_data)
|
||||
ui.notify(f'Created {new_name}', type='positive')
|
||||
|
||||
ui.button('Create Batch Copy', icon='content_copy', on_click=create_batch)
|
||||
return
|
||||
|
||||
if state.restored_indicator:
|
||||
ui.label(f'Editing Restored Version: {state.restored_indicator}').classes(
|
||||
'text-info q-pa-sm')
|
||||
|
||||
batch_list = data.get(KEY_BATCH_DATA, [])
|
||||
|
||||
# Source file data for importing
|
||||
json_files = sorted(state.current_dir.glob('*.json'))
|
||||
json_files = [f for f in json_files if f.name not in (
|
||||
'.editor_config.json', '.editor_snippets.json')]
|
||||
file_options = {f.name: f.name for f in json_files}
|
||||
|
||||
src_file_select = ui.select(
|
||||
file_options,
|
||||
value=file_path.name,
|
||||
label='Source File:',
|
||||
).classes('w-64')
|
||||
|
||||
src_seq_select = ui.select([], label='Source Sequence:').classes('w-64')
|
||||
|
||||
# Track loaded source data
|
||||
_src_cache = {'data': None, 'batch': [], 'name': None}
|
||||
|
||||
def _update_src():
|
||||
name = src_file_select.value
|
||||
if name and name != _src_cache['name']:
|
||||
src_data, _ = load_json(state.current_dir / name)
|
||||
_src_cache['data'] = src_data
|
||||
_src_cache['batch'] = src_data.get(KEY_BATCH_DATA, [])
|
||||
_src_cache['name'] = name
|
||||
if _src_cache['batch']:
|
||||
opts = {i: format_seq_label(s.get(KEY_SEQUENCE_NUMBER, i+1))
|
||||
for i, s in enumerate(_src_cache['batch'])}
|
||||
src_seq_select.options = opts
|
||||
src_seq_select.set_value(0)
|
||||
else:
|
||||
src_seq_select.options = {}
|
||||
|
||||
src_file_select.on_value_change(lambda _: _update_src())
|
||||
_update_src()
|
||||
|
||||
# --- Add New Sequence ---
|
||||
ui.label('Add New Sequence').classes('text-subtitle1 q-mt-md')
|
||||
|
||||
def _add_sequence(new_item):
|
||||
max_seq = 0
|
||||
for s in batch_list:
|
||||
sn = int(s.get(KEY_SEQUENCE_NUMBER, 0))
|
||||
if not is_subsegment(sn):
|
||||
max_seq = max(max_seq, sn)
|
||||
new_item[KEY_SEQUENCE_NUMBER] = max_seq + 1
|
||||
for k in [KEY_PROMPT_HISTORY, KEY_HISTORY_TREE, 'note', 'loras']:
|
||||
new_item.pop(k, None)
|
||||
batch_list.append(new_item)
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
render_sequence_list.refresh()
|
||||
|
||||
with ui.row():
|
||||
def add_empty():
|
||||
_add_sequence(DEFAULTS.copy())
|
||||
|
||||
def add_from_source():
|
||||
item = DEFAULTS.copy()
|
||||
src_batch = _src_cache['batch']
|
||||
sel_idx = src_seq_select.value
|
||||
if src_batch and sel_idx is not None:
|
||||
item.update(src_batch[int(sel_idx)])
|
||||
elif _src_cache['data']:
|
||||
item.update(_src_cache['data'])
|
||||
_add_sequence(item)
|
||||
|
||||
ui.button('Add Empty', icon='add', on_click=add_empty)
|
||||
ui.button('From Source', icon='file_download', on_click=add_from_source)
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- Mass Update ---
|
||||
_render_mass_update(batch_list, data, file_path, state)
|
||||
|
||||
# --- Standard / LoRA / VACE key sets ---
|
||||
lora_keys = ['lora 1 high', 'lora 1 low', 'lora 2 high', 'lora 2 low',
|
||||
'lora 3 high', 'lora 3 low']
|
||||
standard_keys = {
|
||||
'general_prompt', 'general_negative', 'current_prompt', 'negative', 'prompt',
|
||||
'seed', 'cfg', 'camera', 'flf', KEY_SEQUENCE_NUMBER,
|
||||
'frame_to_skip', 'end_frame', 'transition', 'vace_length',
|
||||
'input_a_frames', 'input_b_frames', 'reference switch', 'vace schedule',
|
||||
'reference path', 'video file path', 'reference image path', 'flf image path',
|
||||
}
|
||||
standard_keys.update(lora_keys)
|
||||
|
||||
def sort_by_number():
|
||||
batch_list.sort(key=lambda s: int(s.get(KEY_SEQUENCE_NUMBER, 0)))
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
ui.notify('Sorted by sequence number!', type='positive')
|
||||
render_sequence_list.refresh()
|
||||
|
||||
# --- Sequence list (count label + cards inside refreshable) ---
|
||||
@ui.refreshable
|
||||
def render_sequence_list():
|
||||
with ui.row().classes('w-full items-center'):
|
||||
ui.label(f'Batch contains {len(batch_list)} sequences.')
|
||||
ui.button('Sort by Number', icon='sort', on_click=sort_by_number).props('flat')
|
||||
|
||||
for i, seq in enumerate(batch_list):
|
||||
_render_sequence_card(
|
||||
i, seq, batch_list, data, file_path, state,
|
||||
_src_cache, src_seq_select,
|
||||
standard_keys, render_sequence_list,
|
||||
)
|
||||
|
||||
render_sequence_list()
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- Save & Snap ---
|
||||
with ui.row().classes('w-full items-end q-gutter-md'):
|
||||
commit_input = ui.input('Change Note (Optional)',
|
||||
placeholder='e.g. Added sequence 3').classes('col')
|
||||
|
||||
def save_and_snap():
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
tree_data = data.get(KEY_HISTORY_TREE, {})
|
||||
htree = HistoryTree(tree_data)
|
||||
snapshot_payload = copy.deepcopy(data)
|
||||
snapshot_payload.pop(KEY_HISTORY_TREE, None)
|
||||
note = commit_input.value if commit_input.value else 'Batch Update'
|
||||
htree.commit(snapshot_payload, note=note)
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
save_json(file_path, data)
|
||||
state.restored_indicator = None
|
||||
commit_input.set_value('')
|
||||
ui.notify('Batch Saved & Snapshot Created!', type='positive')
|
||||
|
||||
ui.button('Save & Snap', icon='save', on_click=save_and_snap).props('color=primary')
|
||||
|
||||
|
||||
# ======================================================================
|
||||
# Single sequence card
|
||||
# ======================================================================
|
||||
|
||||
def _render_sequence_card(i, seq, batch_list, data, file_path, state,
|
||||
src_cache, src_seq_select, standard_keys,
|
||||
refresh_list):
|
||||
seq_num = seq.get(KEY_SEQUENCE_NUMBER, i + 1)
|
||||
|
||||
if is_subsegment(seq_num):
|
||||
label = f'Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)} ({int(seq_num)})'
|
||||
else:
|
||||
label = f'Sequence #{seq_num}'
|
||||
|
||||
with ui.expansion(label, icon='movie').classes('w-full'):
|
||||
# --- Action row ---
|
||||
with ui.row().classes('w-full q-gutter-sm'):
|
||||
# Copy from source
|
||||
def copy_source(idx=i, sn=seq_num):
|
||||
item = DEFAULTS.copy()
|
||||
src_batch = src_cache['batch']
|
||||
sel_idx = src_seq_select.value
|
||||
if src_batch and sel_idx is not None:
|
||||
item.update(src_batch[int(sel_idx)])
|
||||
elif src_cache['data']:
|
||||
item.update(src_cache['data'])
|
||||
item[KEY_SEQUENCE_NUMBER] = sn
|
||||
item.pop(KEY_PROMPT_HISTORY, None)
|
||||
item.pop(KEY_HISTORY_TREE, None)
|
||||
batch_list[idx] = item
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
ui.notify('Copied!', type='positive')
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button('Copy Src', icon='file_download', on_click=copy_source).props('dense')
|
||||
|
||||
# Clone Next
|
||||
def clone_next(idx=i, sn=seq_num, s=seq):
|
||||
new_seq = copy.deepcopy(s)
|
||||
max_sn = max(
|
||||
(int(x.get(KEY_SEQUENCE_NUMBER, 0))
|
||||
for x in batch_list if not is_subsegment(x.get(KEY_SEQUENCE_NUMBER, 0))),
|
||||
default=0)
|
||||
new_seq[KEY_SEQUENCE_NUMBER] = max_sn + 1
|
||||
if not is_subsegment(sn):
|
||||
pos = find_insert_position(batch_list, idx, int(sn))
|
||||
else:
|
||||
pos = idx + 1
|
||||
batch_list.insert(pos, new_seq)
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
ui.notify('Cloned to Next!', type='positive')
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button('Clone Next', icon='content_copy', on_click=clone_next).props('dense')
|
||||
|
||||
# Clone End
|
||||
def clone_end(s=seq):
|
||||
new_seq = copy.deepcopy(s)
|
||||
max_sn = max(
|
||||
(int(x.get(KEY_SEQUENCE_NUMBER, 0))
|
||||
for x in batch_list if not is_subsegment(x.get(KEY_SEQUENCE_NUMBER, 0))),
|
||||
default=0)
|
||||
new_seq[KEY_SEQUENCE_NUMBER] = max_sn + 1
|
||||
batch_list.append(new_seq)
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
ui.notify('Cloned to End!', type='positive')
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button('Clone End', icon='vertical_align_bottom', on_click=clone_end).props('dense')
|
||||
|
||||
# Clone Sub
|
||||
def clone_sub(idx=i, sn=seq_num, s=seq):
|
||||
new_seq = copy.deepcopy(s)
|
||||
p_seq = parent_of(sn)
|
||||
p_idx = idx
|
||||
if is_subsegment(sn):
|
||||
for pi, ps in enumerate(batch_list):
|
||||
if int(ps.get(KEY_SEQUENCE_NUMBER, 0)) == p_seq:
|
||||
p_idx = pi
|
||||
break
|
||||
new_seq[KEY_SEQUENCE_NUMBER] = next_sub_segment_number(batch_list, p_seq)
|
||||
pos = find_insert_position(batch_list, p_idx, p_seq)
|
||||
batch_list.insert(pos, new_seq)
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
ui.notify(f'Created {format_seq_label(new_seq[KEY_SEQUENCE_NUMBER])}!',
|
||||
type='positive')
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button('Clone Sub', icon='link', on_click=clone_sub).props('dense')
|
||||
|
||||
# Promote
|
||||
def promote(idx=i, s=seq):
|
||||
single_data = s.copy()
|
||||
single_data[KEY_PROMPT_HISTORY] = data.get(KEY_PROMPT_HISTORY, [])
|
||||
single_data[KEY_HISTORY_TREE] = data.get(KEY_HISTORY_TREE, {})
|
||||
single_data.pop(KEY_SEQUENCE_NUMBER, None)
|
||||
save_json(file_path, single_data)
|
||||
state.data_cache = single_data
|
||||
ui.notify('Converted to Single!', type='positive')
|
||||
# Full refresh so batch tab re-enters render_batch_processor
|
||||
# and sees the file is now single (no KEY_BATCH_DATA)
|
||||
state._render_main.refresh()
|
||||
|
||||
ui.button('Promote', icon='north_west', on_click=promote).props('dense')
|
||||
|
||||
# Delete
|
||||
def delete(idx=i):
|
||||
batch_list.pop(idx)
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button(icon='delete', on_click=delete).props('dense color=negative')
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- Prompts + Settings ---
|
||||
with ui.row().classes('w-full q-gutter-md'):
|
||||
# Left column: prompts
|
||||
with ui.column().classes('col-8'):
|
||||
dict_textarea('General Prompt', seq, 'general_prompt').classes(
|
||||
'w-full').props('outlined rows=2')
|
||||
dict_textarea('General Negative', seq, 'general_negative').classes(
|
||||
'w-full').props('outlined rows=2')
|
||||
dict_textarea('Specific Prompt', seq, 'current_prompt').classes(
|
||||
'w-full').props('outlined rows=10')
|
||||
dict_textarea('Specific Negative', seq, 'negative').classes(
|
||||
'w-full').props('outlined rows=2')
|
||||
|
||||
# Right column: settings
|
||||
with ui.column().classes('col-4'):
|
||||
# Sequence number
|
||||
sn_label = (
|
||||
f'Seq Number (Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)})'
|
||||
if is_subsegment(seq_num) else 'Sequence Number'
|
||||
)
|
||||
sn_input = dict_number(sn_label, seq, KEY_SEQUENCE_NUMBER)
|
||||
sn_input.props('outlined')
|
||||
|
||||
# Seed + randomize
|
||||
with ui.row().classes('w-full items-end'):
|
||||
seed_input = dict_number('Seed', seq, 'seed').classes('col').props('outlined')
|
||||
|
||||
def randomize_seed(si=seed_input, s=seq):
|
||||
new_seed = random.randint(0, 999999999999)
|
||||
si.set_value(new_seed)
|
||||
s['seed'] = new_seed
|
||||
|
||||
ui.button(icon='casino', on_click=randomize_seed).props('flat')
|
||||
|
||||
# CFG
|
||||
cfg_val = float(seq.get('cfg', DEFAULTS['cfg']))
|
||||
cfg_input = ui.number('CFG', value=cfg_val, step=0.5,
|
||||
format='%.1f').props('outlined')
|
||||
cfg_input.on('blur', lambda e: seq.__setitem__(
|
||||
'cfg', e.sender.value if e.sender.value is not None else DEFAULTS['cfg']))
|
||||
|
||||
dict_input(ui.input, 'Camera', seq, 'camera').props('outlined')
|
||||
dict_input(ui.input, 'FLF', seq, 'flf').props('outlined')
|
||||
dict_number('End Frame', seq, 'end_frame').props('outlined')
|
||||
dict_input(ui.input, 'Video File Path', seq, 'video file path').props('outlined')
|
||||
|
||||
# Image paths with preview
|
||||
for img_label, img_key in [
|
||||
('Reference Image Path', 'reference image path'),
|
||||
('Reference Path', 'reference path'),
|
||||
('FLF Image Path', 'flf image path'),
|
||||
]:
|
||||
with ui.row().classes('w-full items-center'):
|
||||
inp = dict_input(ui.input, img_label, seq, img_key).classes(
|
||||
'col').props('outlined')
|
||||
img_path = Path(seq.get(img_key, '')) if seq.get(img_key) else None
|
||||
if (img_path and img_path.exists() and
|
||||
img_path.suffix.lower() in IMAGE_EXTENSIONS):
|
||||
with ui.dialog() as dlg, ui.card():
|
||||
ui.image(str(img_path)).classes('w-full')
|
||||
ui.button(icon='visibility', on_click=dlg.open).props('flat dense')
|
||||
|
||||
# VACE Settings
|
||||
with ui.expansion('VACE Settings', icon='settings').classes('w-full'):
|
||||
_render_vace_settings(i, seq, batch_list, data, file_path, refresh_list)
|
||||
|
||||
# --- LoRA Settings ---
|
||||
with ui.expansion('LoRA Settings', icon='style').classes('w-full'):
|
||||
with ui.row().classes('w-full q-gutter-md'):
|
||||
for lora_idx in range(1, 4):
|
||||
with ui.column().classes('col'):
|
||||
ui.label(f'LoRA {lora_idx}').classes('text-subtitle2')
|
||||
for tier, tier_label in [('high', 'High'), ('low', 'Low')]:
|
||||
k = f'lora {lora_idx} {tier}'
|
||||
raw = str(seq.get(k, ''))
|
||||
disp = raw.replace('<lora:', '').replace('>', '')
|
||||
|
||||
with ui.row().classes('w-full items-center'):
|
||||
ui.label('<lora:').classes('text-caption font-mono')
|
||||
lora_input = ui.input(
|
||||
f'L{lora_idx} {tier_label}',
|
||||
value=disp,
|
||||
).classes('col').props('outlined dense')
|
||||
ui.label('>').classes('text-caption font-mono')
|
||||
|
||||
def on_lora_blur(e, key=k):
|
||||
v = e.sender.value
|
||||
seq[key] = f'<lora:{v}>' if v else ''
|
||||
|
||||
lora_input.on('blur', on_lora_blur)
|
||||
|
||||
# --- Custom Parameters ---
|
||||
ui.separator()
|
||||
ui.label('Custom Parameters').classes('text-caption')
|
||||
|
||||
custom_keys = [k for k in seq.keys() if k not in standard_keys]
|
||||
if custom_keys:
|
||||
for k in custom_keys:
|
||||
with ui.row().classes('w-full items-center'):
|
||||
ui.input('Key', value=k).props('readonly outlined dense').classes('w-32')
|
||||
val_input = ui.input('Value', value=str(seq[k])).props(
|
||||
'outlined dense').classes('col')
|
||||
val_input.on('blur', lambda e, key=k: seq.__setitem__(key, e.sender.value))
|
||||
|
||||
def del_custom(key=k):
|
||||
del seq[key]
|
||||
save_json(file_path, data)
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button(icon='delete', on_click=del_custom).props('flat dense color=negative')
|
||||
|
||||
with ui.expansion('Add Parameter', icon='add').classes('w-full'):
|
||||
new_k_input = ui.input('Key').props('outlined dense')
|
||||
new_v_input = ui.input('Value').props('outlined dense')
|
||||
|
||||
def add_param():
|
||||
k = new_k_input.value
|
||||
v = new_v_input.value
|
||||
if k and k not in seq:
|
||||
seq[k] = v
|
||||
save_json(file_path, data)
|
||||
new_k_input.set_value('')
|
||||
new_v_input.set_value('')
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button('Add', on_click=add_param).props('flat')
|
||||
|
||||
|
||||
# ======================================================================
|
||||
# VACE Settings sub-section
|
||||
# ======================================================================
|
||||
|
||||
def _render_vace_settings(i, seq, batch_list, data, file_path, refresh_list):
|
||||
# Frame to Skip + shift
|
||||
with ui.row().classes('w-full items-end'):
|
||||
fts_input = dict_number('Frame to Skip', seq, 'frame_to_skip').classes('col').props(
|
||||
'outlined')
|
||||
|
||||
# Capture original at render time; blur updates seq before click fires
|
||||
_original_fts = int(seq.get('frame_to_skip', 81))
|
||||
|
||||
def shift_fts(idx=i, orig=_original_fts):
|
||||
new_fts = int(fts_input.value) if fts_input.value is not None else orig
|
||||
delta = new_fts - orig
|
||||
if delta == 0:
|
||||
ui.notify('No change to shift', type='info')
|
||||
return
|
||||
shifted = 0
|
||||
for j in range(idx + 1, len(batch_list)):
|
||||
batch_list[j]['frame_to_skip'] = int(
|
||||
batch_list[j].get('frame_to_skip', 81)) + delta
|
||||
shifted += 1
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
save_json(file_path, data)
|
||||
ui.notify(f'Shifted {shifted} sequences by {delta:+d}', type='positive')
|
||||
refresh_list.refresh()
|
||||
|
||||
ui.button('Shift', icon='arrow_downward', on_click=shift_fts).props('dense')
|
||||
|
||||
dict_input(ui.input, 'Transition', seq, 'transition').props('outlined')
|
||||
|
||||
# VACE Schedule
|
||||
sched_val = max(0, min(int(seq.get('vace schedule', 1)), len(VACE_MODES) - 1))
|
||||
with ui.row().classes('w-full items-center'):
|
||||
vs_input = ui.number('VACE Schedule', value=sched_val, min=0,
|
||||
max=len(VACE_MODES) - 1).classes('col').props('outlined')
|
||||
vs_input.on('blur', lambda e: seq.__setitem__(
|
||||
'vace schedule', int(e.sender.value) if e.sender.value is not None else 0))
|
||||
mode_label = ui.label(VACE_MODES[sched_val]).classes('text-caption')
|
||||
|
||||
def update_mode_label(e):
|
||||
idx = int(e.sender.value) if e.sender.value is not None else 0
|
||||
idx = max(0, min(idx, len(VACE_MODES) - 1))
|
||||
mode_label.set_text(VACE_MODES[idx])
|
||||
|
||||
vs_input.on('update:model-value', update_mode_label)
|
||||
|
||||
# Mode reference
|
||||
with ui.dialog() as ref_dlg, ui.card():
|
||||
table_md = (
|
||||
'| # | Mode | Formula |\n|:--|:-----|:--------|\n'
|
||||
+ '\n'.join(
|
||||
f'| **{j}** | {VACE_MODES[j]} | `{VACE_FORMULAS[j]}` |'
|
||||
for j in range(len(VACE_MODES)))
|
||||
+ '\n\n*All totals snapped to 4n+1 (1,5,9,...,49,...,81,...)*'
|
||||
)
|
||||
ui.markdown(table_md)
|
||||
ui.button('Mode Reference', icon='help', on_click=ref_dlg.open).props('flat dense')
|
||||
|
||||
# Input A / B frames
|
||||
ia_input = dict_number('Input A Frames', seq, 'input_a_frames').props('outlined')
|
||||
ib_input = dict_number('Input B Frames', seq, 'input_b_frames').props('outlined')
|
||||
|
||||
# VACE Length + output calculation
|
||||
input_a = int(seq.get('input_a_frames', 16))
|
||||
input_b = int(seq.get('input_b_frames', 16))
|
||||
stored_total = int(seq.get('vace_length', 49))
|
||||
mode_idx = int(seq.get('vace schedule', 1))
|
||||
|
||||
if mode_idx == 0:
|
||||
base_length = max(stored_total - input_a, 1)
|
||||
elif mode_idx == 1:
|
||||
base_length = max(stored_total - input_b, 1)
|
||||
else:
|
||||
base_length = max(stored_total - input_a - input_b, 1)
|
||||
|
||||
with ui.row().classes('w-full items-center'):
|
||||
vl_input = ui.number('VACE Length', value=base_length, min=1).classes('col').props(
|
||||
'outlined')
|
||||
output_label = ui.label(f'Output: {stored_total}').classes('text-bold')
|
||||
|
||||
def recalc_vace(*_args):
|
||||
mi = int(vs_input.value) if vs_input.value is not None else 0
|
||||
ia = int(ia_input.value) if ia_input.value is not None else 16
|
||||
ib = int(ib_input.value) if ib_input.value is not None else 16
|
||||
nb = int(vl_input.value) if vl_input.value is not None else 1
|
||||
|
||||
if mi == 0:
|
||||
raw = nb + ia
|
||||
elif mi == 1:
|
||||
raw = nb + ib
|
||||
else:
|
||||
raw = nb + ia + ib
|
||||
|
||||
snapped = ((raw + 2) // 4) * 4 + 1
|
||||
seq['vace_length'] = snapped
|
||||
output_label.set_text(f'Output: {snapped}')
|
||||
|
||||
for inp in (vs_input, ia_input, ib_input, vl_input):
|
||||
inp.on('update:model-value', recalc_vace)
|
||||
|
||||
dict_number('Reference Switch', seq, 'reference switch').props('outlined')
|
||||
|
||||
|
||||
# ======================================================================
|
||||
# Mass Update
|
||||
# ======================================================================
|
||||
|
||||
def _render_mass_update(batch_list, data, file_path, state: AppState):
|
||||
with ui.expansion('Mass Update', icon='sync').classes('w-full'):
|
||||
if len(batch_list) < 2:
|
||||
ui.label('Need at least 2 sequences for mass update.').classes('text-caption')
|
||||
return
|
||||
|
||||
source_options = {i: format_seq_label(s.get(KEY_SEQUENCE_NUMBER, i+1))
|
||||
for i, s in enumerate(batch_list)}
|
||||
source_select = ui.select(source_options, value=0,
|
||||
label='Copy from sequence:').classes('w-full')
|
||||
|
||||
field_select = ui.select([], multiple=True,
|
||||
label='Fields to copy:').classes('w-full')
|
||||
|
||||
def update_fields(_=None):
|
||||
idx = source_select.value
|
||||
if idx is not None and 0 <= idx < len(batch_list):
|
||||
src = batch_list[idx]
|
||||
keys = [k for k in src.keys() if k != 'sequence_number']
|
||||
field_select.options = keys
|
||||
|
||||
source_select.on_value_change(update_fields)
|
||||
update_fields()
|
||||
|
||||
ui.label('Apply to:').classes('text-subtitle2 q-mt-md')
|
||||
select_all_cb = ui.checkbox('Select All')
|
||||
target_checks = {}
|
||||
for idx, s in enumerate(batch_list):
|
||||
sn = s.get(KEY_SEQUENCE_NUMBER, idx + 1)
|
||||
cb = ui.checkbox(format_seq_label(sn))
|
||||
target_checks[idx] = cb
|
||||
|
||||
def on_select_all(e):
|
||||
for cb in target_checks.values():
|
||||
cb.set_value(e.value)
|
||||
|
||||
select_all_cb.on_value_change(on_select_all)
|
||||
|
||||
def apply_mass_update():
|
||||
src_idx = source_select.value
|
||||
if src_idx is None or src_idx >= len(batch_list):
|
||||
ui.notify('Source sequence no longer exists', type='warning')
|
||||
return
|
||||
selected_keys = field_select.value or []
|
||||
if not selected_keys:
|
||||
ui.notify('No fields selected', type='warning')
|
||||
return
|
||||
|
||||
source_seq = batch_list[src_idx]
|
||||
targets = [idx for idx, cb in target_checks.items()
|
||||
if cb.value and idx != src_idx and idx < len(batch_list)]
|
||||
if not targets:
|
||||
ui.notify('No target sequences selected', type='warning')
|
||||
return
|
||||
|
||||
for idx in targets:
|
||||
for key in selected_keys:
|
||||
batch_list[idx][key] = copy.deepcopy(source_seq.get(key))
|
||||
|
||||
data[KEY_BATCH_DATA] = batch_list
|
||||
htree = HistoryTree(data.get(KEY_HISTORY_TREE, {}))
|
||||
snapshot = copy.deepcopy(data)
|
||||
snapshot.pop(KEY_HISTORY_TREE, None)
|
||||
htree.commit(snapshot, f"Mass update: {', '.join(selected_keys)}")
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
save_json(file_path, data)
|
||||
ui.notify(f'Updated {len(targets)} sequences', type='positive')
|
||||
|
||||
ui.button('Apply Changes', icon='check', on_click=apply_mass_update).props(
|
||||
'color=primary')
|
||||
241
tab_comfy_ng.py
Normal file
241
tab_comfy_ng.py
Normal file
@@ -0,0 +1,241 @@
|
||||
import html
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
import requests
|
||||
from nicegui import ui
|
||||
|
||||
from state import AppState
|
||||
from utils import save_config
|
||||
|
||||
|
||||
def render_comfy_monitor(state: AppState):
|
||||
config = state.config
|
||||
|
||||
# --- Global Monitor Settings ---
|
||||
with ui.expansion('Monitor Settings', icon='settings').classes('w-full'):
|
||||
with ui.row().classes('w-full items-end'):
|
||||
viewer_input = ui.input(
|
||||
'Remote Browser URL',
|
||||
value=config.get('viewer_url', ''),
|
||||
placeholder='e.g., http://localhost:5800',
|
||||
).classes('col')
|
||||
timeout_slider = ui.slider(
|
||||
min=0, max=60, step=1,
|
||||
value=config.get('monitor_timeout', 0),
|
||||
).classes('col')
|
||||
ui.label().bind_text_from(timeout_slider, 'value',
|
||||
backward=lambda v: f'Timeout: {v} min')
|
||||
|
||||
def save_monitor_settings():
|
||||
config['viewer_url'] = viewer_input.value
|
||||
config['monitor_timeout'] = int(timeout_slider.value)
|
||||
save_config(state.current_dir, config['favorites'], config)
|
||||
ui.notify('Monitor settings saved!', type='positive')
|
||||
|
||||
ui.button('Save Monitor Settings', icon='save', on_click=save_monitor_settings)
|
||||
|
||||
# --- Instance Management ---
|
||||
if 'comfy_instances' not in config:
|
||||
config['comfy_instances'] = [
|
||||
{'name': 'Main Server', 'url': 'http://192.168.1.100:8188'}
|
||||
]
|
||||
|
||||
instances = config['comfy_instances']
|
||||
|
||||
@ui.refreshable
|
||||
def render_instance_tabs():
|
||||
if not instances:
|
||||
ui.label('No servers configured. Add one below.')
|
||||
|
||||
for idx, inst in enumerate(instances):
|
||||
with ui.expansion(inst.get('name', f'Server {idx+1}'), icon='dns').classes('w-full'):
|
||||
_render_single_instance(state, inst, idx, instances, render_instance_tabs)
|
||||
|
||||
# Add server section
|
||||
ui.separator()
|
||||
ui.label('Add New Server').classes('text-subtitle1')
|
||||
with ui.row().classes('w-full items-end'):
|
||||
new_name = ui.input('Server Name', placeholder='e.g. Render Node 2').classes('col')
|
||||
new_url = ui.input('URL', placeholder='http://192.168.1.50:8188').classes('col')
|
||||
|
||||
def add_instance():
|
||||
if new_name.value and new_url.value:
|
||||
instances.append({'name': new_name.value, 'url': new_url.value})
|
||||
config['comfy_instances'] = instances
|
||||
save_config(state.current_dir, config['favorites'], config)
|
||||
ui.notify('Server Added!', type='positive')
|
||||
new_name.set_value('')
|
||||
new_url.set_value('')
|
||||
render_instance_tabs.refresh()
|
||||
else:
|
||||
ui.notify('Please fill in both Name and URL.', type='warning')
|
||||
|
||||
ui.button('Add Instance', icon='add', on_click=add_instance)
|
||||
|
||||
render_instance_tabs()
|
||||
|
||||
# --- Auto-poll timer (every 300s) ---
|
||||
def poll_all():
|
||||
# Timeout checks for live toggles
|
||||
timeout_val = config.get('monitor_timeout', 0)
|
||||
if timeout_val > 0:
|
||||
for key, start_time in list(state.live_toggles.items()):
|
||||
if start_time and (time.time() - start_time) > (timeout_val * 60):
|
||||
state.live_toggles[key] = None
|
||||
|
||||
ui.timer(300, poll_all)
|
||||
|
||||
|
||||
def _render_single_instance(state: AppState, instance_config: dict, index: int,
|
||||
all_instances: list, refresh_fn):
|
||||
config = state.config
|
||||
url = instance_config.get('url', 'http://127.0.0.1:8188')
|
||||
name = instance_config.get('name', f'Server {index+1}')
|
||||
comfy_url = url.rstrip('/')
|
||||
|
||||
# --- Settings popover ---
|
||||
with ui.expansion('Settings', icon='settings'):
|
||||
name_input = ui.input('Name', value=name).classes('w-full')
|
||||
url_input = ui.input('URL', value=url).classes('w-full')
|
||||
|
||||
def update_server():
|
||||
all_instances[index]['name'] = name_input.value
|
||||
all_instances[index]['url'] = url_input.value
|
||||
config['comfy_instances'] = all_instances
|
||||
save_config(state.current_dir, config['favorites'], config)
|
||||
ui.notify('Server config saved!', type='positive')
|
||||
refresh_fn.refresh()
|
||||
|
||||
def remove_server():
|
||||
all_instances.pop(index)
|
||||
config['comfy_instances'] = all_instances
|
||||
save_config(state.current_dir, config['favorites'], config)
|
||||
ui.notify('Server removed', type='info')
|
||||
refresh_fn.refresh()
|
||||
|
||||
ui.button('Update & Save', icon='save', on_click=update_server).props('color=primary')
|
||||
ui.button('Remove Server', icon='delete', on_click=remove_server).props('color=negative')
|
||||
|
||||
# --- Status Dashboard ---
|
||||
status_container = ui.row().classes('w-full items-center q-gutter-md')
|
||||
|
||||
def refresh_status():
|
||||
status_container.clear()
|
||||
with status_container:
|
||||
try:
|
||||
res = requests.get(f'{comfy_url}/queue', timeout=1.5)
|
||||
queue_data = res.json()
|
||||
running_cnt = len(queue_data.get('queue_running', []))
|
||||
pending_cnt = len(queue_data.get('queue_pending', []))
|
||||
|
||||
with ui.card().classes('q-pa-sm'):
|
||||
ui.label('Status')
|
||||
ui.label('Online' if running_cnt > 0 else 'Idle').classes(
|
||||
'text-positive' if running_cnt > 0 else 'text-grey')
|
||||
with ui.card().classes('q-pa-sm'):
|
||||
ui.label('Pending')
|
||||
ui.label(str(pending_cnt))
|
||||
with ui.card().classes('q-pa-sm'):
|
||||
ui.label('Running')
|
||||
ui.label(str(running_cnt))
|
||||
except Exception:
|
||||
with ui.card().classes('q-pa-sm'):
|
||||
ui.label('Status')
|
||||
ui.label('Offline').classes('text-negative')
|
||||
ui.label(f'Could not connect to {comfy_url}').classes('text-negative')
|
||||
|
||||
refresh_status()
|
||||
ui.button('Refresh Status', icon='refresh', on_click=refresh_status).props('flat dense')
|
||||
|
||||
# --- Live View ---
|
||||
ui.label('Live View').classes('text-subtitle1 q-mt-md')
|
||||
toggle_key = f'live_toggle_{index}'
|
||||
|
||||
live_checkbox = ui.checkbox('Enable Live Preview', value=False)
|
||||
|
||||
@ui.refreshable
|
||||
def render_live_view():
|
||||
if not live_checkbox.value:
|
||||
ui.label('Live Preview is disabled.').classes('text-caption')
|
||||
return
|
||||
|
||||
# Record start time
|
||||
if toggle_key not in state.live_toggles or state.live_toggles.get(toggle_key) is None:
|
||||
state.live_toggles[toggle_key] = time.time()
|
||||
|
||||
timeout_val = config.get('monitor_timeout', 0)
|
||||
if timeout_val > 0:
|
||||
start = state.live_toggles.get(toggle_key, time.time())
|
||||
remaining = (timeout_val * 60) - (time.time() - start)
|
||||
if remaining <= 0:
|
||||
live_checkbox.set_value(False)
|
||||
state.live_toggles[toggle_key] = None
|
||||
ui.label('Preview timed out.').classes('text-caption')
|
||||
return
|
||||
ui.label(f'Auto-off in: {int(remaining)}s').classes('text-caption')
|
||||
|
||||
iframe_h = ui.slider(min=600, max=2500, step=50, value=1000).classes('w-full')
|
||||
ui.label().bind_text_from(iframe_h, 'value', backward=lambda v: f'Height: {v}px')
|
||||
|
||||
viewer_base = config.get('viewer_url', '').strip()
|
||||
parsed = urllib.parse.urlparse(viewer_base)
|
||||
if viewer_base and parsed.scheme in ('http', 'https'):
|
||||
safe_src = html.escape(viewer_base, quote=True)
|
||||
ui.label(f'Viewing: {viewer_base}').classes('text-caption')
|
||||
|
||||
iframe_container = ui.column().classes('w-full')
|
||||
|
||||
def update_iframe():
|
||||
iframe_container.clear()
|
||||
with iframe_container:
|
||||
ui.html(
|
||||
f'<iframe src="{safe_src}" width="100%" height="{int(iframe_h.value)}px"'
|
||||
f' style="border: 2px solid #666; border-radius: 8px;"></iframe>'
|
||||
)
|
||||
|
||||
iframe_h.on_value_change(lambda _: update_iframe())
|
||||
update_iframe()
|
||||
else:
|
||||
ui.label('No valid viewer URL configured.').classes('text-warning')
|
||||
|
||||
live_checkbox.on_value_change(lambda _: render_live_view.refresh())
|
||||
render_live_view()
|
||||
|
||||
# --- Latest Output ---
|
||||
ui.label('Latest Output').classes('text-subtitle1 q-mt-md')
|
||||
img_container = ui.column().classes('w-full')
|
||||
|
||||
def check_image():
|
||||
img_container.clear()
|
||||
with img_container:
|
||||
try:
|
||||
hist_res = requests.get(f'{comfy_url}/history', timeout=2)
|
||||
history = hist_res.json()
|
||||
if not history:
|
||||
ui.label('No history found.').classes('text-caption')
|
||||
return
|
||||
last_prompt_id = list(history.keys())[-1]
|
||||
outputs = history[last_prompt_id].get('outputs', {})
|
||||
found_img = None
|
||||
for node_output in outputs.values():
|
||||
if 'images' in node_output:
|
||||
for img_info in node_output['images']:
|
||||
if img_info['type'] == 'output':
|
||||
found_img = img_info
|
||||
break
|
||||
if found_img:
|
||||
break
|
||||
if found_img:
|
||||
img_name = found_img['filename']
|
||||
folder = found_img['subfolder']
|
||||
img_type = found_img['type']
|
||||
img_url = f'{comfy_url}/view?filename={img_name}&subfolder={folder}&type={img_type}'
|
||||
ui.image(img_url).classes('w-full').style('max-width: 600px')
|
||||
ui.label(f'Last Output: {img_name}').classes('text-caption')
|
||||
else:
|
||||
ui.label('Last run had no image output.').classes('text-caption')
|
||||
except Exception as e:
|
||||
ui.label(f'Error fetching image: {e}').classes('text-negative')
|
||||
|
||||
ui.button('Check Latest Image', icon='image', on_click=check_image).props('flat')
|
||||
74
tab_raw_ng.py
Normal file
74
tab_raw_ng.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import copy
|
||||
import json
|
||||
|
||||
from nicegui import ui
|
||||
|
||||
from state import AppState
|
||||
from utils import save_json, get_file_mtime, KEY_HISTORY_TREE, KEY_PROMPT_HISTORY
|
||||
|
||||
|
||||
def render_raw_editor(state: AppState):
|
||||
data = state.data_cache
|
||||
file_path = state.file_path
|
||||
|
||||
ui.label(f'Raw Editor: {file_path.name}').classes('text-h6')
|
||||
|
||||
hide_history = ui.checkbox(
|
||||
'Hide History (Safe Mode)',
|
||||
value=True,
|
||||
)
|
||||
|
||||
@ui.refreshable
|
||||
def render_editor():
|
||||
# Prepare display data
|
||||
if hide_history.value:
|
||||
display_data = copy.deepcopy(data)
|
||||
display_data.pop(KEY_HISTORY_TREE, None)
|
||||
display_data.pop(KEY_PROMPT_HISTORY, None)
|
||||
else:
|
||||
display_data = data
|
||||
|
||||
try:
|
||||
json_str = json.dumps(display_data, indent=4, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
ui.notify(f'Error serializing JSON: {e}', type='negative')
|
||||
json_str = '{}'
|
||||
|
||||
text_area = ui.textarea(
|
||||
'JSON Content',
|
||||
value=json_str,
|
||||
).classes('w-full font-mono').props('outlined rows=30')
|
||||
|
||||
ui.separator()
|
||||
|
||||
def do_save():
|
||||
try:
|
||||
input_data = json.loads(text_area.value)
|
||||
|
||||
# Merge hidden history back in if safe mode
|
||||
if hide_history.value:
|
||||
if KEY_HISTORY_TREE in data:
|
||||
input_data[KEY_HISTORY_TREE] = data[KEY_HISTORY_TREE]
|
||||
if KEY_PROMPT_HISTORY in data:
|
||||
input_data[KEY_PROMPT_HISTORY] = data[KEY_PROMPT_HISTORY]
|
||||
|
||||
save_json(file_path, input_data)
|
||||
|
||||
data.clear()
|
||||
data.update(input_data)
|
||||
state.last_mtime = get_file_mtime(file_path)
|
||||
|
||||
ui.notify('Raw JSON Saved Successfully!', type='positive')
|
||||
render_editor.refresh()
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
ui.notify(f'Invalid JSON Syntax: {e}', type='negative')
|
||||
except Exception as e:
|
||||
ui.notify(f'Unexpected Error: {e}', type='negative')
|
||||
|
||||
ui.button('Save Raw Changes', icon='save', on_click=do_save).props(
|
||||
'color=primary'
|
||||
).classes('w-full')
|
||||
|
||||
hide_history.on_value_change(lambda _: render_editor.refresh())
|
||||
render_editor()
|
||||
353
tab_timeline_ng.py
Normal file
353
tab_timeline_ng.py
Normal file
@@ -0,0 +1,353 @@
|
||||
import copy
|
||||
import time
|
||||
|
||||
from nicegui import ui
|
||||
|
||||
from state import AppState
|
||||
from history_tree import HistoryTree
|
||||
from utils import save_json, KEY_BATCH_DATA, KEY_HISTORY_TREE
|
||||
|
||||
|
||||
def render_timeline_tab(state: AppState):
|
||||
data = state.data_cache
|
||||
file_path = state.file_path
|
||||
|
||||
tree_data = data.get(KEY_HISTORY_TREE, {})
|
||||
if not tree_data:
|
||||
ui.label('No history timeline exists. Make some changes in the Editor first!').classes(
|
||||
'text-subtitle1 q-pa-md')
|
||||
return
|
||||
|
||||
htree = HistoryTree(tree_data)
|
||||
|
||||
if state.restored_indicator:
|
||||
ui.label(f'Editing Restored Version: {state.restored_indicator}').classes(
|
||||
'text-info q-pa-sm')
|
||||
|
||||
# --- View mode + Selection toggle ---
|
||||
with ui.row().classes('w-full items-center q-gutter-md'):
|
||||
ui.label('Version History').classes('text-h6 col')
|
||||
view_mode = ui.toggle(
|
||||
['Horizontal', 'Vertical', 'Linear Log'],
|
||||
value='Horizontal',
|
||||
)
|
||||
selection_mode = ui.switch('Select to Delete')
|
||||
|
||||
@ui.refreshable
|
||||
def render_timeline():
|
||||
# Rebuild node list inside refreshable so it's current after deletes
|
||||
all_nodes = sorted(htree.nodes.values(), key=lambda x: x['timestamp'], reverse=True)
|
||||
selected_nodes = state.timeline_selected_nodes if selection_mode.value else set()
|
||||
|
||||
# --- Selection picker ---
|
||||
if selection_mode.value:
|
||||
all_ids = [n['id'] for n in all_nodes]
|
||||
|
||||
def fmt_option(nid):
|
||||
n = htree.nodes[nid]
|
||||
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
|
||||
note = n.get('note', 'Step')
|
||||
head = ' (HEAD)' if nid == htree.head_id else ''
|
||||
return f'{note} - {ts} ({nid[:6]}){head}'
|
||||
|
||||
options = {nid: fmt_option(nid) for nid in all_ids}
|
||||
|
||||
def on_selection_change(e):
|
||||
state.timeline_selected_nodes = set(e.value) if e.value else set()
|
||||
|
||||
ui.select(
|
||||
options,
|
||||
value=list(state.timeline_selected_nodes),
|
||||
multiple=True,
|
||||
label='Select nodes to delete:',
|
||||
on_change=on_selection_change,
|
||||
).classes('w-full')
|
||||
|
||||
with ui.row():
|
||||
def select_all():
|
||||
state.timeline_selected_nodes = set(all_ids)
|
||||
render_timeline.refresh()
|
||||
def deselect_all():
|
||||
state.timeline_selected_nodes = set()
|
||||
render_timeline.refresh()
|
||||
ui.button('Select All', on_click=select_all).props('flat dense')
|
||||
ui.button('Deselect All', on_click=deselect_all).props('flat dense')
|
||||
|
||||
# --- Graph views ---
|
||||
mode = view_mode.value
|
||||
if mode in ('Horizontal', 'Vertical'):
|
||||
direction = 'LR' if mode == 'Horizontal' else 'TB'
|
||||
try:
|
||||
graph_dot = htree.generate_graph(direction=direction)
|
||||
_render_graphviz(graph_dot)
|
||||
except Exception as e:
|
||||
ui.label(f'Graph Error: {e}').classes('text-negative')
|
||||
|
||||
# --- Linear Log view ---
|
||||
elif mode == 'Linear Log':
|
||||
ui.label('Chronological list of all snapshots.').classes('text-caption')
|
||||
for n in all_nodes:
|
||||
is_head = n['id'] == htree.head_id
|
||||
is_selected = n['id'] in selected_nodes
|
||||
|
||||
with ui.card().classes(
|
||||
'w-full q-mb-sm' +
|
||||
(' bg-yellow-1' if is_head else '') +
|
||||
(' bg-red-1' if is_selected else '')
|
||||
):
|
||||
with ui.row().classes('w-full items-center'):
|
||||
if selection_mode.value:
|
||||
ui.checkbox(
|
||||
'',
|
||||
value=is_selected,
|
||||
on_change=lambda e, nid=n['id']: _toggle_select(
|
||||
nid, e.value),
|
||||
)
|
||||
|
||||
icon = 'location_on' if is_head else 'circle'
|
||||
ui.icon(icon).classes(
|
||||
'text-primary' if is_head else 'text-grey')
|
||||
|
||||
with ui.column().classes('col'):
|
||||
note = n.get('note', 'Step')
|
||||
ts = time.strftime('%b %d %H:%M',
|
||||
time.localtime(n['timestamp']))
|
||||
label = f'{note} (Current)' if is_head else note
|
||||
ui.label(label).classes('text-bold')
|
||||
ui.label(
|
||||
f'ID: {n["id"][:6]} - {ts}').classes('text-caption')
|
||||
|
||||
if not is_head and not selection_mode.value:
|
||||
ui.button(
|
||||
'Restore',
|
||||
icon='restore',
|
||||
on_click=lambda node=n: _restore_and_refresh(node),
|
||||
).props('flat dense color=primary')
|
||||
|
||||
# --- Batch Delete ---
|
||||
if selection_mode.value and state.timeline_selected_nodes:
|
||||
valid = state.timeline_selected_nodes & set(htree.nodes.keys())
|
||||
state.timeline_selected_nodes = valid
|
||||
count = len(valid)
|
||||
if count > 0:
|
||||
ui.label(
|
||||
f'{count} node{"s" if count != 1 else ""} selected for deletion.'
|
||||
).classes('text-warning q-mt-md')
|
||||
|
||||
def do_batch_delete():
|
||||
if 'history_tree_backup' not in data:
|
||||
data['history_tree_backup'] = []
|
||||
data['history_tree_backup'].append(copy.deepcopy(htree.to_dict()))
|
||||
for nid in valid:
|
||||
if nid in htree.nodes:
|
||||
del htree.nodes[nid]
|
||||
for b, tip in list(htree.branches.items()):
|
||||
if tip in valid:
|
||||
del htree.branches[b]
|
||||
if htree.head_id in valid:
|
||||
if htree.nodes:
|
||||
fallback = sorted(htree.nodes.values(),
|
||||
key=lambda x: x['timestamp'])[-1]
|
||||
htree.head_id = fallback['id']
|
||||
else:
|
||||
htree.head_id = None
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
save_json(file_path, data)
|
||||
state.timeline_selected_nodes = set()
|
||||
ui.notify(
|
||||
f'Deleted {count} node{"s" if count != 1 else ""}!',
|
||||
type='positive')
|
||||
render_timeline.refresh()
|
||||
|
||||
ui.button(
|
||||
f'Delete {count} Node{"s" if count != 1 else ""}',
|
||||
icon='delete',
|
||||
on_click=do_batch_delete,
|
||||
).props('color=negative')
|
||||
|
||||
ui.separator()
|
||||
|
||||
# --- Node selector + actions ---
|
||||
ui.label('Manage Version').classes('text-subtitle1 q-mt-md')
|
||||
|
||||
def fmt_node(n):
|
||||
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
|
||||
return f'{n.get("note", "Step")} - {ts} ({n["id"][:6]})'
|
||||
|
||||
node_options = {n['id']: fmt_node(n) for n in all_nodes}
|
||||
current_id = htree.head_id if htree.head_id in node_options else (
|
||||
all_nodes[0]['id'] if all_nodes else None)
|
||||
|
||||
selected_node_id = ui.select(
|
||||
node_options,
|
||||
value=current_id,
|
||||
label='Select Version to Manage:',
|
||||
).classes('w-full')
|
||||
|
||||
with ui.row().classes('w-full items-end q-gutter-md'):
|
||||
def restore_selected():
|
||||
nid = selected_node_id.value
|
||||
if nid and nid in htree.nodes:
|
||||
_restore_and_refresh(htree.nodes[nid])
|
||||
|
||||
ui.button('Restore Version', icon='restore',
|
||||
on_click=restore_selected).props('color=primary')
|
||||
|
||||
# Rename
|
||||
with ui.row().classes('w-full items-end q-gutter-md'):
|
||||
rename_input = ui.input('Rename Label').classes('col')
|
||||
|
||||
def rename_node():
|
||||
nid = selected_node_id.value
|
||||
if nid and nid in htree.nodes and rename_input.value:
|
||||
htree.nodes[nid]['note'] = rename_input.value
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
save_json(file_path, data)
|
||||
ui.notify('Label updated', type='positive')
|
||||
render_timeline.refresh()
|
||||
|
||||
ui.button('Update Label', on_click=rename_node).props('flat')
|
||||
|
||||
# Danger zone
|
||||
with ui.expansion('Danger Zone (Delete)', icon='warning').classes('w-full q-mt-md'):
|
||||
ui.label('Deleting a node cannot be undone.').classes('text-warning')
|
||||
|
||||
def delete_selected():
|
||||
nid = selected_node_id.value
|
||||
if nid and nid in htree.nodes:
|
||||
if 'history_tree_backup' not in data:
|
||||
data['history_tree_backup'] = []
|
||||
data['history_tree_backup'].append(
|
||||
copy.deepcopy(htree.to_dict()))
|
||||
del htree.nodes[nid]
|
||||
for b, tip in list(htree.branches.items()):
|
||||
if tip == nid:
|
||||
del htree.branches[b]
|
||||
if htree.head_id == nid:
|
||||
if htree.nodes:
|
||||
fallback = sorted(htree.nodes.values(),
|
||||
key=lambda x: x['timestamp'])[-1]
|
||||
htree.head_id = fallback['id']
|
||||
else:
|
||||
htree.head_id = None
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
save_json(file_path, data)
|
||||
ui.notify('Node Deleted', type='positive')
|
||||
render_timeline.refresh()
|
||||
|
||||
ui.button('Delete This Node', icon='delete',
|
||||
on_click=delete_selected).props('color=negative')
|
||||
|
||||
# Data preview
|
||||
ui.separator()
|
||||
with ui.expansion('Data Preview', icon='preview').classes('w-full'):
|
||||
@ui.refreshable
|
||||
def render_preview():
|
||||
_render_data_preview(selected_node_id, htree)
|
||||
selected_node_id.on_value_change(lambda _: render_preview.refresh())
|
||||
render_preview()
|
||||
|
||||
def _toggle_select(nid, checked):
|
||||
if checked:
|
||||
state.timeline_selected_nodes.add(nid)
|
||||
else:
|
||||
state.timeline_selected_nodes.discard(nid)
|
||||
render_timeline.refresh()
|
||||
|
||||
def _restore_and_refresh(node):
|
||||
_restore_node(data, node, htree, file_path, state)
|
||||
# Refresh all tabs (batch, raw, timeline) so they pick up the restored data
|
||||
state._render_main.refresh()
|
||||
|
||||
view_mode.on_value_change(lambda _: render_timeline.refresh())
|
||||
selection_mode.on_value_change(lambda _: render_timeline.refresh())
|
||||
render_timeline()
|
||||
|
||||
|
||||
def _render_graphviz(dot_source: str):
|
||||
"""Render graphviz DOT source as SVG using ui.html."""
|
||||
try:
|
||||
import graphviz
|
||||
src = graphviz.Source(dot_source)
|
||||
svg = src.pipe(format='svg').decode('utf-8')
|
||||
ui.html(f'<div style="overflow-x: auto;">{svg}</div>')
|
||||
except ImportError:
|
||||
ui.label('Install graphviz Python package for graph rendering.').classes('text-warning')
|
||||
ui.code(dot_source).classes('w-full')
|
||||
except Exception as e:
|
||||
ui.label(f'Graph rendering error: {e}').classes('text-negative')
|
||||
|
||||
|
||||
def _restore_node(data, node, htree, file_path, state: AppState):
|
||||
"""Restore a history node as the current version."""
|
||||
node_data = node['data']
|
||||
if KEY_BATCH_DATA not in node_data and KEY_BATCH_DATA in data:
|
||||
del data[KEY_BATCH_DATA]
|
||||
data.update(node_data)
|
||||
htree.head_id = node['id']
|
||||
data[KEY_HISTORY_TREE] = htree.to_dict()
|
||||
save_json(file_path, data)
|
||||
label = f"{node.get('note', 'Step')} ({node['id'][:4]})"
|
||||
state.restored_indicator = label
|
||||
ui.notify('Restored!', type='positive')
|
||||
|
||||
|
||||
def _render_data_preview(selected_node_id, htree):
|
||||
"""Render a read-only preview of the selected node's data."""
|
||||
nid = selected_node_id.value
|
||||
if not nid or nid not in htree.nodes:
|
||||
ui.label('No node selected.').classes('text-caption')
|
||||
return
|
||||
|
||||
node_data = htree.nodes[nid]['data']
|
||||
batch_list = node_data.get(KEY_BATCH_DATA, [])
|
||||
|
||||
if batch_list and isinstance(batch_list, list) and len(batch_list) > 0:
|
||||
ui.label(f'This snapshot contains {len(batch_list)} sequences.').classes('text-caption')
|
||||
for i, seq_data in enumerate(batch_list):
|
||||
seq_num = seq_data.get('sequence_number', i + 1)
|
||||
with ui.expansion(f'Sequence #{seq_num}', value=(i == 0)):
|
||||
_render_preview_fields(seq_data)
|
||||
else:
|
||||
_render_preview_fields(node_data)
|
||||
|
||||
|
||||
def _render_preview_fields(item_data: dict):
|
||||
"""Render read-only preview of prompts, settings, LoRAs."""
|
||||
with ui.grid(columns=2).classes('w-full'):
|
||||
ui.textarea('General Positive',
|
||||
value=item_data.get('general_prompt', '')).props('readonly outlined rows=3')
|
||||
ui.textarea('General Negative',
|
||||
value=item_data.get('general_negative', '')).props('readonly outlined rows=3')
|
||||
val_sp = item_data.get('current_prompt', '') or item_data.get('prompt', '')
|
||||
ui.textarea('Specific Positive',
|
||||
value=val_sp).props('readonly outlined rows=3')
|
||||
ui.textarea('Specific Negative',
|
||||
value=item_data.get('negative', '')).props('readonly outlined rows=3')
|
||||
|
||||
with ui.row().classes('w-full q-gutter-md'):
|
||||
ui.input('Camera', value=str(item_data.get('camera', 'static'))).props('readonly outlined')
|
||||
ui.input('FLF', value=str(item_data.get('flf', '0.0'))).props('readonly outlined')
|
||||
ui.input('Seed', value=str(item_data.get('seed', '-1'))).props('readonly outlined')
|
||||
|
||||
with ui.expansion('LoRA Configuration'):
|
||||
with ui.row().classes('w-full q-gutter-md'):
|
||||
for lora_idx in range(1, 4):
|
||||
with ui.column():
|
||||
ui.input(f'L{lora_idx} Name',
|
||||
value=item_data.get(f'lora {lora_idx} high', '')).props(
|
||||
'readonly outlined dense')
|
||||
ui.input(f'L{lora_idx} Str',
|
||||
value=str(item_data.get(f'lora {lora_idx} low', ''))).props(
|
||||
'readonly outlined dense')
|
||||
|
||||
vace_keys = ['frame_to_skip', 'vace schedule', 'video file path']
|
||||
if any(k in item_data for k in vace_keys):
|
||||
with ui.expansion('VACE / I2V Settings'):
|
||||
with ui.row().classes('w-full q-gutter-md'):
|
||||
ui.input('Skip Frames',
|
||||
value=str(item_data.get('frame_to_skip', ''))).props('readonly outlined')
|
||||
ui.input('Schedule',
|
||||
value=str(item_data.get('vace schedule', ''))).props('readonly outlined')
|
||||
ui.input('Video Path',
|
||||
value=str(item_data.get('video file path', ''))).props('readonly outlined')
|
||||
@@ -1,15 +1,8 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Mock streamlit before importing utils
|
||||
import sys
|
||||
from unittest.mock import MagicMock
|
||||
sys.modules.setdefault("streamlit", MagicMock())
|
||||
|
||||
from utils import load_json, save_json, get_file_mtime, ALLOWED_BASE_DIR, DEFAULTS, resolve_path_case_insensitive
|
||||
|
||||
|
||||
|
||||
4
utils.py
4
utils.py
@@ -5,8 +5,6 @@ import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import streamlit as st
|
||||
|
||||
# --- Magic String Keys ---
|
||||
KEY_BATCH_DATA = "batch_data"
|
||||
KEY_HISTORY_TREE = "history_tree"
|
||||
@@ -145,7 +143,7 @@ def load_json(path: str | Path) -> tuple[dict[str, Any], float]:
|
||||
data = json.load(f)
|
||||
return data, path.stat().st_mtime
|
||||
except Exception as e:
|
||||
st.error(f"Error loading JSON: {e}")
|
||||
logger.error(f"Error loading JSON: {e}")
|
||||
return DEFAULTS.copy(), 0
|
||||
|
||||
def save_json(path: str | Path, data: dict[str, Any]) -> None:
|
||||
|
||||
Reference in New Issue
Block a user