Remove old Streamlit UI files superseded by NiceGUI migration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 22:17:38 +01:00
parent d8597f201a
commit 9a3f7b7b94
5 changed files with 0 additions and 1535 deletions

224
app.py
View File

@@ -1,224 +0,0 @@
import streamlit as st
from pathlib import Path
# --- Import Custom Modules ---
from utils import (
load_config, save_config, load_snippets, save_snippets,
load_json, save_json, generate_templates, DEFAULTS,
KEY_BATCH_DATA, KEY_SEQUENCE_NUMBER,
resolve_path_case_insensitive,
)
from tab_batch import render_batch_processor
from tab_timeline import render_timeline_tab
from tab_comfy import render_comfy_monitor
from tab_raw import render_raw_editor
# ==========================================
# 1. PAGE CONFIGURATION
# ==========================================
st.set_page_config(layout="wide", page_title="AI Settings Manager")
# ==========================================
# 2. SESSION STATE INITIALIZATION
# ==========================================
_SESSION_DEFAULTS = {
"snippets": load_snippets,
"loaded_file": lambda: None,
"last_mtime": lambda: 0,
"ui_reset_token": lambda: 0,
"active_tab_name": lambda: "🚀 Batch Processor",
}
if 'config' not in st.session_state:
st.session_state.config = load_config()
st.session_state.current_dir = Path(st.session_state.config.get("last_dir", Path.cwd()))
for key, factory in _SESSION_DEFAULTS.items():
if key not in st.session_state:
st.session_state[key] = factory()
# ==========================================
# 3. SIDEBAR (NAVIGATOR & TOOLS)
# ==========================================
with st.sidebar:
st.header("📂 Navigator")
# --- Path Navigator ---
# Sync widget to current_dir on first load or after external change
if "nav_path_input" not in st.session_state or st.session_state.get("_sync_nav_path"):
st.session_state.nav_path_input = str(st.session_state.current_dir)
st.session_state._sync_nav_path = False
def _on_path_change():
new_path = st.session_state.nav_path_input
p = resolve_path_case_insensitive(new_path)
if p is not None and p.is_dir():
st.session_state.current_dir = p
st.session_state.config['last_dir'] = str(p)
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
st.session_state.loaded_file = None
# Always resync widget to canonical path form
st.session_state._sync_nav_path = True
st.text_input("Current Path", key="nav_path_input", on_change=_on_path_change)
# --- Favorites System ---
if st.button("📌 Pin Folder", use_container_width=True):
if str(st.session_state.current_dir) not in st.session_state.config['favorites']:
st.session_state.config['favorites'].append(str(st.session_state.current_dir))
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
st.rerun()
favorites = st.session_state.config['favorites']
if favorites:
def _on_fav_jump():
sel = st.session_state._fav_radio
if sel != "Select..." and sel != str(st.session_state.current_dir):
st.session_state.current_dir = Path(sel)
st.session_state._sync_nav_path = True
st.radio(
"Jump to:",
["Select..."] + favorites,
index=0,
key="_fav_radio",
label_visibility="collapsed",
on_change=_on_fav_jump,
)
# Unpin buttons for each favorite
for fav in favorites:
fc1, fc2 = st.columns([4, 1])
fc1.caption(fav)
if fc2.button("", key=f"unpin_{fav}"):
st.session_state.config['favorites'].remove(fav)
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
st.rerun()
st.markdown("---")
# --- Snippet Library ---
st.subheader("🧩 Snippet Library")
with st.expander("Add New Snippet"):
snip_name = st.text_input("Name", placeholder="e.g. Cinematic")
snip_content = st.text_area("Content", placeholder="4k, high quality...")
if st.button("Save Snippet"):
if snip_name and snip_content:
st.session_state.snippets[snip_name] = snip_content
save_snippets(st.session_state.snippets)
st.success(f"Saved '{snip_name}'")
st.rerun()
if st.session_state.snippets:
st.caption("Click to Append to Prompt:")
for name, content in st.session_state.snippets.items():
col_s1, col_s2 = st.columns([4, 1])
if col_s1.button(f" {name}", use_container_width=True):
st.rerun()
if col_s2.button("🗑️", key=f"del_snip_{name}"):
del st.session_state.snippets[name]
save_snippets(st.session_state.snippets)
st.rerun()
st.markdown("---")
# --- File List & Creation ---
json_files = sorted(list(st.session_state.current_dir.glob("*.json")))
json_files = [f for f in json_files if f.name != ".editor_config.json" and f.name != ".editor_snippets.json"]
if not json_files:
if st.button("Generate Templates"):
generate_templates(st.session_state.current_dir)
st.rerun()
with st.expander("Create New JSON"):
new_filename = st.text_input("Filename", placeholder="my_prompt_vace")
if st.button("Create"):
if not new_filename.endswith(".json"): new_filename += ".json"
path = st.session_state.current_dir / new_filename
first_item = DEFAULTS.copy()
first_item[KEY_SEQUENCE_NUMBER] = 1
data = {KEY_BATCH_DATA: [first_item]}
save_json(path, data)
st.rerun()
# --- File Selector ---
selected_file_name = None
if json_files:
file_names = [f.name for f in json_files]
if 'file_selector' not in st.session_state:
st.session_state.file_selector = file_names[0]
if st.session_state.file_selector not in file_names:
st.session_state.file_selector = file_names[0]
selected_file_name = st.radio("Select File", file_names, key="file_selector")
else:
st.info("No JSON files in this folder.")
if 'file_selector' in st.session_state:
del st.session_state.file_selector
st.session_state.loaded_file = None
# --- GLOBAL MONITOR TOGGLE (NEW) ---
st.markdown("---")
show_monitor = st.checkbox("Show Comfy Monitor", value=True)
# ==========================================
# 4. MAIN APP LOGIC
# ==========================================
if selected_file_name:
file_path = st.session_state.current_dir / selected_file_name
# --- FILE LOADING & AUTO-SWITCH LOGIC ---
if st.session_state.loaded_file != str(file_path):
data, mtime = load_json(file_path)
st.session_state.data_cache = data
st.session_state.last_mtime = mtime
st.session_state.loaded_file = str(file_path)
# Clear transient states
if 'restored_indicator' in st.session_state: del st.session_state.restored_indicator
# --- AUTO-SWITCH TAB LOGIC ---
st.session_state.active_tab_name = "🚀 Batch Processor"
else:
data = st.session_state.data_cache
st.title(f"Editing: {selected_file_name}")
# --- CONTROLLED NAVIGATION ---
# Removed "🔌 Comfy Monitor" from this list
tabs_list = [
"🚀 Batch Processor",
"🕒 Timeline",
"💻 Raw Editor"
]
if st.session_state.active_tab_name not in tabs_list:
st.session_state.active_tab_name = tabs_list[0]
current_tab = st.radio(
"Navigation",
tabs_list,
key="active_tab_name",
horizontal=True,
label_visibility="collapsed"
)
st.markdown("---")
# --- RENDER EDITOR TABS ---
if current_tab == "🚀 Batch Processor":
render_batch_processor(data, file_path, json_files, st.session_state.current_dir, selected_file_name)
elif current_tab == "🕒 Timeline":
render_timeline_tab(data, file_path)
elif current_tab == "💻 Raw Editor":
render_raw_editor(data, file_path)
# --- GLOBAL PERSISTENT MONITOR ---
if show_monitor:
st.markdown("---")
with st.expander("🔌 ComfyUI Monitor", expanded=True):
render_comfy_monitor()

View File

@@ -1,594 +0,0 @@
import streamlit as st
import random
import copy
from pathlib import Path
from utils import DEFAULTS, save_json, load_json, KEY_BATCH_DATA, KEY_HISTORY_TREE, KEY_PROMPT_HISTORY, KEY_SEQUENCE_NUMBER
from history_tree import HistoryTree
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"}
SUB_SEGMENT_MULTIPLIER = 1000
def is_subsegment(seq_num):
"""Return True if seq_num is a sub-segment (>= 1000)."""
return int(seq_num) >= SUB_SEGMENT_MULTIPLIER
def parent_of(seq_num):
"""Return the parent segment number (or self if already a parent)."""
seq_num = int(seq_num)
return seq_num // SUB_SEGMENT_MULTIPLIER if is_subsegment(seq_num) else seq_num
def sub_index_of(seq_num):
"""Return the sub-index (0 if parent)."""
seq_num = int(seq_num)
return seq_num % SUB_SEGMENT_MULTIPLIER if is_subsegment(seq_num) else 0
def format_seq_label(seq_num):
"""Return display label: 'Sequence #3' or 'Sub #2.1'."""
seq_num = int(seq_num)
if is_subsegment(seq_num):
return f"Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)}"
return f"Sequence #{seq_num}"
def next_sub_segment_number(batch_list, parent_seq_num):
"""Find the next available sub-segment number under a parent."""
parent_seq_num = int(parent_seq_num)
max_sub = 0
for s in batch_list:
sn = int(s.get(KEY_SEQUENCE_NUMBER, 0))
if is_subsegment(sn) and parent_of(sn) == parent_seq_num:
max_sub = max(max_sub, sub_index_of(sn))
return parent_seq_num * SUB_SEGMENT_MULTIPLIER + max_sub + 1
def find_insert_position(batch_list, parent_index, parent_seq_num):
"""Find the insert position after the parent's last existing sub-segment."""
parent_seq_num = int(parent_seq_num)
pos = parent_index + 1
while pos < len(batch_list):
sn = int(batch_list[pos].get(KEY_SEQUENCE_NUMBER, 0))
if is_subsegment(sn) and parent_of(sn) == parent_seq_num:
pos += 1
else:
break
return pos
def _render_mass_update(batch_list, data, file_path, key_prefix):
"""Render the mass update UI section."""
with st.expander("🔄 Mass Update", expanded=False):
if len(batch_list) < 2:
st.info("Need at least 2 sequences for mass update.")
return
# Source sequence selector
source_idx = st.selectbox(
"Copy from sequence:",
range(len(batch_list)),
format_func=lambda i: format_seq_label(batch_list[i].get('sequence_number', i+1)),
key=f"{key_prefix}_mass_src"
)
source_seq = batch_list[source_idx]
# Field multi-select (exclude sequence_number)
available_keys = [k for k in source_seq.keys() if k != "sequence_number"]
selected_keys = st.multiselect("Fields to copy:", available_keys, key=f"{key_prefix}_mass_fields")
if not selected_keys:
return
# Target sequence checkboxes
st.write("Apply to:")
select_all = st.checkbox("Select All", key=f"{key_prefix}_mass_all")
target_indices = []
target_cols = st.columns(min(4, len(batch_list) - 1)) if len(batch_list) > 1 else [st]
col_idx = 0
for i, seq in enumerate(batch_list):
if i == source_idx:
continue
seq_num = seq.get("sequence_number", i + 1)
with target_cols[col_idx % len(target_cols)]:
checked = select_all or st.checkbox(format_seq_label(seq_num), key=f"{key_prefix}_mass_t{i}")
if checked:
target_indices.append(i)
col_idx += 1
# Preview
if target_indices and selected_keys:
with st.expander("Preview changes", expanded=True):
for key in selected_keys:
val = source_seq.get(key, "")
display_val = str(val)[:100] + "..." if len(str(val)) > 100 else str(val)
st.caption(f"**{key}**: {display_val}")
# Apply button
if st.button("Apply Changes", type="primary", key=f"{key_prefix}_mass_apply"):
for i in target_indices:
for key in selected_keys:
batch_list[i][key] = copy.deepcopy(source_seq.get(key))
# Save with history snapshot
data[KEY_BATCH_DATA] = batch_list
htree = HistoryTree(data.get(KEY_HISTORY_TREE, {}))
snapshot_payload = copy.deepcopy(data)
if KEY_HISTORY_TREE in snapshot_payload:
del snapshot_payload[KEY_HISTORY_TREE]
htree.commit(snapshot_payload, f"Mass update: {', '.join(selected_keys)}")
data[KEY_HISTORY_TREE] = htree.to_dict()
save_json(file_path, data)
st.session_state.data_cache = data
st.session_state.ui_reset_token += 1
st.toast(f"Updated {len(target_indices)} sequences", icon="")
st.rerun()
def create_batch_callback(original_filename, current_data, current_dir):
new_name = f"batch_{original_filename}"
new_path = current_dir / new_name
if new_path.exists():
st.toast(f"File {new_name} already exists!", icon="⚠️")
return
first_item = current_data.copy()
if KEY_PROMPT_HISTORY in first_item: del first_item[KEY_PROMPT_HISTORY]
if KEY_HISTORY_TREE in first_item: del first_item[KEY_HISTORY_TREE]
first_item[KEY_SEQUENCE_NUMBER] = 1
new_data = {
KEY_BATCH_DATA: [first_item],
KEY_HISTORY_TREE: {},
KEY_PROMPT_HISTORY: []
}
save_json(new_path, new_data)
st.toast(f"Created {new_name}", icon="")
st.session_state.file_selector = new_name
def render_batch_processor(data, file_path, json_files, current_dir, selected_file_name):
is_batch_file = KEY_BATCH_DATA in data or isinstance(data, list)
if not is_batch_file:
st.warning("This is a Single file. To use Batch mode, create a copy.")
st.button("✨ Create Batch Copy", on_click=create_batch_callback, args=(selected_file_name, data, current_dir))
return
if 'restored_indicator' in st.session_state and st.session_state.restored_indicator:
st.info(f"📍 Editing Restored Version: **{st.session_state.restored_indicator}**")
batch_list = data.get(KEY_BATCH_DATA, [])
# --- ADD NEW SEQUENCE AREA ---
st.subheader("Add New Sequence")
ac1, ac2 = st.columns(2)
with ac1:
file_options = [f.name for f in json_files]
d_idx = file_options.index(selected_file_name) if selected_file_name in file_options else 0
src_name = st.selectbox("Source File:", file_options, index=d_idx, key="batch_src_file")
src_data, _ = load_json(current_dir / src_name)
with ac2:
src_batch = src_data.get(KEY_BATCH_DATA, [])
if src_batch:
seq_opts = list(range(len(src_batch)))
sel_seq_idx = st.selectbox(
"Source Sequence:",
seq_opts,
format_func=lambda i: format_seq_label(src_batch[i].get(KEY_SEQUENCE_NUMBER, i + 1)),
key="batch_src_seq"
)
else:
st.caption("Single file (no sequences)")
sel_seq_idx = None
bc1, bc2 = st.columns(2)
def add_sequence(new_item):
max_seq = 0
for s in batch_list:
sn = int(s.get(KEY_SEQUENCE_NUMBER, 0))
if not is_subsegment(sn):
max_seq = max(max_seq, sn)
new_item[KEY_SEQUENCE_NUMBER] = max_seq + 1
for k in [KEY_PROMPT_HISTORY, KEY_HISTORY_TREE, "note", "loras"]:
if k in new_item: del new_item[k]
batch_list.append(new_item)
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.rerun()
if bc1.button(" Add Empty", use_container_width=True):
add_sequence(DEFAULTS.copy())
if bc2.button(" From Source", use_container_width=True, help=f"Import from {src_name}"):
item = DEFAULTS.copy()
if src_batch and sel_seq_idx is not None:
item.update(src_batch[sel_seq_idx])
else:
item.update(src_data)
add_sequence(item)
# --- RENDER LIST ---
st.markdown("---")
info_col, reorder_col = st.columns([3, 1])
info_col.info(f"Batch contains {len(batch_list)} sequences.")
if reorder_col.button("🔢 Sort by Number", use_container_width=True, help="Reorder sequences by sequence number"):
batch_list.sort(key=lambda s: int(s.get(KEY_SEQUENCE_NUMBER, 0)))
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.toast("Sorted by sequence number!", icon="🔢")
st.rerun()
# --- MASS UPDATE SECTION ---
ui_reset_token = st.session_state.get("ui_reset_token", 0)
_render_mass_update(batch_list, data, file_path, f"{selected_file_name}_v{ui_reset_token}")
# Updated LoRA keys to match new logic
lora_keys = ["lora 1 high", "lora 1 low", "lora 2 high", "lora 2 low", "lora 3 high", "lora 3 low"]
standard_keys = {
"general_prompt", "general_negative", "current_prompt", "negative", "prompt", "seed", "cfg",
"camera", "flf", KEY_SEQUENCE_NUMBER
}
standard_keys.update(lora_keys)
standard_keys.update([
"frame_to_skip", "end_frame", "transition", "vace_length",
"input_a_frames", "input_b_frames", "reference switch", "vace schedule",
"reference path", "video file path", "reference image path", "flf image path"
])
VACE_MODES = [
"End Extend", "Pre Extend", "Middle Extend", "Edge Extend",
"Join Extend", "Bidirectional Extend", "Frame Interpolation",
"Replace/Inpaint", "Video Inpaint", "Keyframe",
]
VACE_FORMULAS = [
"base + A", # 0 End Extend
"base + B", # 1 Pre Extend
"base + A + B", # 2 Middle Extend
"base + A + B", # 3 Edge Extend
"base + A + B", # 4 Join Extend
"base + A + B", # 5 Bidirectional
"(B-1) * step", # 6 Frame Interpolation
"snap(source)", # 7 Replace/Inpaint
"snap(source)", # 8 Video Inpaint
"base + A + B", # 9 Keyframe
]
for i, seq in enumerate(batch_list):
seq_num = seq.get(KEY_SEQUENCE_NUMBER, i+1)
prefix = f"{selected_file_name}_seq{i}_v{st.session_state.ui_reset_token}"
if is_subsegment(seq_num):
expander_label = f"🔗 ↳ Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)} ({int(seq_num)})"
else:
expander_label = f"🎬 Sequence #{seq_num}"
with st.expander(expander_label, expanded=False):
# --- ACTION ROW ---
act_c1, act_c2, act_c3, act_c4 = st.columns([1.2, 1.8, 1.2, 0.5])
# 1. Copy Source
with act_c1:
if st.button(f"📥 Copy {src_name}", key=f"{prefix}_copy", use_container_width=True):
item = DEFAULTS.copy()
if src_batch and sel_seq_idx is not None:
item.update(src_batch[sel_seq_idx])
else:
item.update(src_data)
item[KEY_SEQUENCE_NUMBER] = seq_num
for k in [KEY_PROMPT_HISTORY, KEY_HISTORY_TREE]:
if k in item: del item[k]
batch_list[i] = item
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.toast("Copied!", icon="📥")
st.rerun()
# 2. Cloning Tools
with act_c2:
cl_1, cl_2, cl_3 = st.columns(3)
if cl_1.button("👯 Next", key=f"{prefix}_c_next", help="Clone and insert below", use_container_width=True):
new_seq = copy.deepcopy(seq)
max_sn = 0
for s in batch_list:
sn = int(s.get(KEY_SEQUENCE_NUMBER, 0))
if not is_subsegment(sn):
max_sn = max(max_sn, sn)
new_seq[KEY_SEQUENCE_NUMBER] = max_sn + 1
if not is_subsegment(seq_num):
insert_pos = find_insert_position(batch_list, i, int(seq_num))
else:
insert_pos = i + 1
batch_list.insert(insert_pos, new_seq)
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.toast("Cloned to Next!", icon="👯")
st.rerun()
if cl_2.button("⏬ End", key=f"{prefix}_c_end", help="Clone and add to bottom", use_container_width=True):
new_seq = copy.deepcopy(seq)
max_sn = 0
for s in batch_list:
sn = int(s.get(KEY_SEQUENCE_NUMBER, 0))
if not is_subsegment(sn):
max_sn = max(max_sn, sn)
new_seq[KEY_SEQUENCE_NUMBER] = max_sn + 1
batch_list.append(new_seq)
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.toast("Cloned to End!", icon="")
st.rerun()
if cl_3.button("🔗 Sub", key=f"{prefix}_c_sub", help="Clone as sub-segment", use_container_width=True):
new_seq = copy.deepcopy(seq)
p_seq_num = parent_of(seq_num)
# Find the parent's index in batch_list
p_idx = i
if is_subsegment(seq_num):
for pi, ps in enumerate(batch_list):
if int(ps.get(KEY_SEQUENCE_NUMBER, 0)) == p_seq_num:
p_idx = pi
break
new_seq[KEY_SEQUENCE_NUMBER] = next_sub_segment_number(batch_list, p_seq_num)
insert_pos = find_insert_position(batch_list, p_idx, p_seq_num)
batch_list.insert(insert_pos, new_seq)
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.toast(f"Created {format_seq_label(new_seq[KEY_SEQUENCE_NUMBER])}!", icon="🔗")
st.rerun()
# 3. Promote
with act_c3:
if st.button("↖️ Promote", key=f"{prefix}_prom", help="Save as Single File", use_container_width=True):
single_data = seq.copy()
single_data[KEY_PROMPT_HISTORY] = data.get(KEY_PROMPT_HISTORY, [])
single_data[KEY_HISTORY_TREE] = data.get(KEY_HISTORY_TREE, {})
if KEY_SEQUENCE_NUMBER in single_data: del single_data[KEY_SEQUENCE_NUMBER]
save_json(file_path, single_data)
st.session_state.data_cache = single_data
st.session_state.ui_reset_token += 1
st.toast("Converted to Single!", icon="")
st.rerun()
# 4. Remove
with act_c4:
if st.button("🗑️", key=f"{prefix}_del", use_container_width=True):
batch_list.pop(i)
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.rerun()
st.markdown("---")
c1, c2 = st.columns([2, 1])
with c1:
seq["general_prompt"] = st.text_area("General Prompt", value=seq.get("general_prompt", ""), height=60, key=f"{prefix}_gp")
seq["general_negative"] = st.text_area("General Negative", value=seq.get("general_negative", ""), height=60, key=f"{prefix}_gn")
seq["current_prompt"] = st.text_area("Specific Prompt", value=seq.get("current_prompt", ""), height=300, key=f"{prefix}_sp")
seq["negative"] = st.text_area("Specific Negative", value=seq.get("negative", ""), height=60, key=f"{prefix}_sn")
with c2:
sn_label = f"Sequence Number (↳ Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)})" if is_subsegment(seq_num) else "Sequence Number"
seq[KEY_SEQUENCE_NUMBER] = st.number_input(sn_label, value=int(seq_num), key=f"{prefix}_sn_val")
s_row1, s_row2 = st.columns([3, 1])
seed_key = f"{prefix}_seed"
with s_row2:
st.write("")
st.write("")
if st.button("🎲", key=f"{prefix}_rand"):
st.session_state[seed_key] = random.randint(0, 999999999999)
st.rerun()
with s_row1:
current_seed = st.session_state.get(seed_key, int(seq.get("seed", 0)))
val = st.number_input("Seed", value=current_seed, key=seed_key)
seq["seed"] = val
seq["cfg"] = st.number_input("CFG", value=float(seq.get("cfg", DEFAULTS["cfg"])), step=0.5, format="%.1f", key=f"{prefix}_cfg")
seq["camera"] = st.text_input("Camera", value=seq.get("camera", ""), key=f"{prefix}_cam")
seq["flf"] = st.text_input("FLF", value=str(seq.get("flf", DEFAULTS["flf"])), key=f"{prefix}_flf")
seq["end_frame"] = st.number_input("End Frame", value=int(seq.get("end_frame", 0)), key=f"{prefix}_ef")
seq["video file path"] = st.text_input("Video File Path", value=seq.get("video file path", ""), key=f"{prefix}_vid")
for img_label, img_key, img_suffix in [
("Reference Image Path", "reference image path", "rip"),
("Reference Path", "reference path", "rp"),
("FLF Image Path", "flf image path", "flfi"),
]:
img_col, prev_col = st.columns([5, 1])
seq[img_key] = img_col.text_input(img_label, value=seq.get(img_key, ""), key=f"{prefix}_{img_suffix}")
img_path = Path(seq[img_key]) if seq[img_key] else None
if img_path and img_path.exists() and img_path.suffix.lower() in IMAGE_EXTENSIONS:
with prev_col.popover("👁"):
st.image(str(img_path), use_container_width=True)
with st.expander("VACE Settings"):
fts_col, fts_btn = st.columns([3, 1])
saved_fts_key = f"{prefix}_fts_saved"
if saved_fts_key not in st.session_state:
st.session_state[saved_fts_key] = int(seq.get("frame_to_skip", 81))
old_fts = st.session_state[saved_fts_key]
seq["frame_to_skip"] = fts_col.number_input("Frame to Skip", value=old_fts, key=f"{prefix}_fts")
delta = int(seq["frame_to_skip"]) - old_fts
delta_label = f"Shift ↓ ({delta:+d})" if delta != 0 else "Shift ↓ (0)"
fts_btn.write("")
fts_btn.write("")
if fts_btn.button(delta_label, key=f"{prefix}_fts_shift", help="Apply delta to all following sequences", disabled=(delta == 0)):
if delta != 0:
shifted = 0
for j in range(i + 1, len(batch_list)):
batch_list[j]["frame_to_skip"] = int(batch_list[j].get("frame_to_skip", 81)) + delta
shifted += 1
data[KEY_BATCH_DATA] = batch_list
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.toast(f"Shifted {shifted} sequences by {delta:+d}", icon="")
st.rerun()
else:
st.toast("No change to shift", icon="")
seq["transition"] = st.text_input("Transition", value=str(seq.get("transition", "1-2")), key=f"{prefix}_trans")
vs_col, vs_label = st.columns([3, 1])
sched_val = int(seq.get("vace schedule", 1))
seq["vace schedule"] = vs_col.number_input("VACE Schedule", value=sched_val, min_value=0, max_value=len(VACE_MODES) - 1, key=f"{prefix}_vsc")
mode_idx = int(seq["vace schedule"])
vs_label.write("")
vs_label.write("")
vs_label.caption(VACE_MODES[mode_idx])
with st.popover("📋 Mode Reference"):
st.markdown(
"| # | Mode | Formula |\n"
"|:--|:-----|:--------|\n"
+ "\n".join(
f"| **{j}** | {VACE_MODES[j]} | `{VACE_FORMULAS[j]}` |"
for j in range(len(VACE_MODES))
)
+ "\n\n*All totals snapped to 4n+1 (1,5,9,…,49,…,81,…)*"
)
seq["input_a_frames"] = st.number_input("Input A Frames", value=int(seq.get("input_a_frames", 16)), key=f"{prefix}_ia")
seq["input_b_frames"] = st.number_input("Input B Frames", value=int(seq.get("input_b_frames", 16)), key=f"{prefix}_ib")
input_a = int(seq.get("input_a_frames", 16))
input_b = int(seq.get("input_b_frames", 16))
stored_total = int(seq.get("vace_length", 49))
# Reverse using same mode formula that was used to store
if mode_idx == 0:
base_length = max(stored_total - input_a, 1)
elif mode_idx == 1:
base_length = max(stored_total - input_b, 1)
else:
base_length = max(stored_total - input_a - input_b, 1)
vl_col, vl_out = st.columns([3, 1])
new_base = vl_col.number_input("VACE Length", value=base_length, min_value=1, key=f"{prefix}_vl")
if mode_idx == 0: # End Extend: base + A
raw_total = new_base + input_a
elif mode_idx == 1: # Pre Extend: base + B
raw_total = new_base + input_b
else: # Most modes: base + A + B
raw_total = new_base + input_a + input_b
# Snap to 4n+1 (1,5,9,13,...,81,...) to match VACE sampler
seq["vace_length"] = ((raw_total + 2) // 4) * 4 + 1
vl_out.metric("Output", seq["vace_length"])
seq["reference switch"] = st.number_input("Reference Switch", value=int(seq.get("reference switch", 1)), key=f"{prefix}_rsw")
# --- UPDATED: LoRA Settings with Tag Wrapping ---
with st.expander("💊 LoRA Settings"):
lc1, lc2, lc3 = st.columns(3)
# Helper to render the tag wrapper UI
def render_lora_col(col_obj, lora_idx):
with col_obj:
st.caption(f"**LoRA {lora_idx}**")
# --- HIGH ---
k_high = f"lora {lora_idx} high"
raw_h = str(seq.get(k_high, ""))
# Strip tags for display
disp_h = raw_h.replace("<lora:", "").replace(">", "")
st.write("High:")
rh1, rh2, rh3 = st.columns([0.25, 1, 0.1])
rh1.markdown("<div style='text-align: right; padding-top: 8px;'><code>&lt;lora:</code></div>", unsafe_allow_html=True)
val_h = rh2.text_input(f"L{lora_idx}H", value=disp_h, key=f"{prefix}_l{lora_idx}h", label_visibility="collapsed")
rh3.markdown("<div style='padding-top: 8px;'><code>&gt;</code></div>", unsafe_allow_html=True)
if val_h:
seq[k_high] = f"<lora:{val_h}>"
else:
seq[k_high] = ""
# --- LOW ---
k_low = f"lora {lora_idx} low"
raw_l = str(seq.get(k_low, ""))
# Strip tags for display
disp_l = raw_l.replace("<lora:", "").replace(">", "")
st.write("Low:")
rl1, rl2, rl3 = st.columns([0.25, 1, 0.1])
rl1.markdown("<div style='text-align: right; padding-top: 8px;'><code>&lt;lora:</code></div>", unsafe_allow_html=True)
val_l = rl2.text_input(f"L{lora_idx}L", value=disp_l, key=f"{prefix}_l{lora_idx}l", label_visibility="collapsed")
rl3.markdown("<div style='padding-top: 8px;'><code>&gt;</code></div>", unsafe_allow_html=True)
if val_l:
seq[k_low] = f"<lora:{val_l}>"
else:
seq[k_low] = ""
render_lora_col(lc1, 1)
render_lora_col(lc2, 2)
render_lora_col(lc3, 3)
# --- CUSTOM PARAMETERS ---
st.markdown("---")
st.caption("🔧 Custom Parameters")
custom_keys = [k for k in seq.keys() if k not in standard_keys]
keys_to_remove = []
if custom_keys:
for k in custom_keys:
ck1, ck2, ck3 = st.columns([1, 2, 0.5])
ck1.text_input("Key", value=k, disabled=True, key=f"{prefix}_ck_lbl_{k}", label_visibility="collapsed")
val = ck2.text_input("Value", value=str(seq[k]), key=f"{prefix}_cv_{k}", label_visibility="collapsed")
seq[k] = val
if ck3.button("🗑️", key=f"{prefix}_cdel_{k}"):
keys_to_remove.append(k)
with st.expander(" Add Parameter"):
nk_col, nv_col = st.columns(2)
new_k = nk_col.text_input("Key", key=f"{prefix}_new_k")
new_v = nv_col.text_input("Value", key=f"{prefix}_new_v")
if st.button("Add", key=f"{prefix}_add_cust"):
if new_k and new_k not in seq:
seq[new_k] = new_v
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.rerun()
if keys_to_remove:
for k in keys_to_remove:
del seq[k]
save_json(file_path, data)
st.session_state.ui_reset_token += 1
st.rerun()
st.markdown("---")
# --- SAVE ACTIONS WITH HISTORY COMMIT ---
col_save, col_note = st.columns([1, 2])
with col_note:
commit_msg = st.text_input("Change Note (Optional)", placeholder="e.g. Added sequence 3")
with col_save:
if st.button("💾 Save & Snap", use_container_width=True):
data[KEY_BATCH_DATA] = batch_list
tree_data = data.get(KEY_HISTORY_TREE, {})
htree = HistoryTree(tree_data)
snapshot_payload = copy.deepcopy(data)
if KEY_HISTORY_TREE in snapshot_payload: del snapshot_payload[KEY_HISTORY_TREE]
htree.commit(snapshot_payload, note=commit_msg if commit_msg else "Batch Update")
data[KEY_HISTORY_TREE] = htree.to_dict()
save_json(file_path, data)
if 'restored_indicator' in st.session_state:
del st.session_state.restored_indicator
st.toast("Batch Saved & Snapshot Created!", icon="🚀")
st.rerun()

View File

@@ -1,249 +0,0 @@
import streamlit as st
import requests
from PIL import Image
from io import BytesIO
import urllib.parse
import html
import time # <--- NEW IMPORT
from utils import save_config
def render_single_instance(instance_config, index, all_instances, timeout_minutes):
url = instance_config.get("url", "http://127.0.0.1:8188")
name = instance_config.get("name", f"Server {index+1}")
COMFY_URL = url.rstrip("/")
# --- TIMEOUT LOGIC ---
# Generate unique keys for session state
toggle_key = f"live_toggle_{index}"
start_time_key = f"live_start_{index}"
# Check if we need to auto-close
if st.session_state.get(toggle_key, False) and timeout_minutes > 0:
start_time = st.session_state.get(start_time_key, 0)
elapsed = time.time() - start_time
if elapsed > (timeout_minutes * 60):
st.session_state[toggle_key] = False
# We don't need st.rerun() here because the fragment loop will pick up the state change on the next pass
# but an explicit rerun makes it snappy.
st.rerun()
c_head, c_set = st.columns([3, 1])
c_head.markdown(f"### 🔌 {name}")
with c_set.popover("⚙️ Settings"):
st.caption("Press Update to apply changes!")
new_name = st.text_input("Name", value=name, key=f"name_{index}")
new_url = st.text_input("URL", value=url, key=f"url_{index}")
if new_url != url:
st.warning("⚠️ Unsaved URL! Click Update below.")
if st.button("💾 Update & Save", key=f"save_{index}", type="primary"):
all_instances[index]["name"] = new_name
all_instances[index]["url"] = new_url
st.session_state.config["comfy_instances"] = all_instances
save_config(
st.session_state.current_dir,
st.session_state.config['favorites'],
st.session_state.config
)
st.toast("Server config saved!", icon="💾")
st.rerun()
st.divider()
if st.button("🗑️ Remove Server", key=f"del_{index}"):
all_instances.pop(index)
st.session_state.config["comfy_instances"] = all_instances
save_config(
st.session_state.current_dir,
st.session_state.config['favorites'],
st.session_state.config
)
st.rerun()
# --- 1. STATUS DASHBOARD ---
with st.expander("📊 Server Status", expanded=True):
col1, col2, col3, col4 = st.columns([1, 1, 1, 1])
try:
res = requests.get(f"{COMFY_URL}/queue", timeout=1.5)
queue_data = res.json()
running_cnt = len(queue_data.get("queue_running", []))
pending_cnt = len(queue_data.get("queue_pending", []))
col1.metric("Status", "🟢 Online" if running_cnt > 0 else "💤 Idle")
col2.metric("Pending", pending_cnt)
col3.metric("Running", running_cnt)
if col4.button("🔄 Check Img", key=f"refresh_{index}", use_container_width=True):
st.session_state[f"force_img_refresh_{index}"] = True
except Exception:
col1.metric("Status", "🔴 Offline")
col2.metric("Pending", "-")
col3.metric("Running", "-")
st.error(f"Could not connect to API at {COMFY_URL}")
# --- 2. LIVE VIEW (VIA REMOTE BROWSER) ---
st.write("")
c_label, c_ctrl = st.columns([1, 2])
c_label.subheader("📺 Live View")
# Capture the toggle interaction to set start time
def on_toggle_change():
if st.session_state[toggle_key]:
st.session_state[start_time_key] = time.time()
enable_preview = c_ctrl.checkbox(
"Enable Live Preview",
value=False,
key=toggle_key,
on_change=on_toggle_change
)
if enable_preview:
# Display Countdown if timeout is active
if timeout_minutes > 0:
elapsed = time.time() - st.session_state.get(start_time_key, time.time())
remaining = (timeout_minutes * 60) - elapsed
st.caption(f"⏱️ Auto-off in: **{int(remaining)}s**")
# Height Slider
iframe_h = st.slider(
"Height (px)",
min_value=600, max_value=2500, value=1000, step=50,
key=f"h_slider_{index}"
)
# Get Configured Viewer URL
viewer_base = st.session_state.config.get("viewer_url", "")
final_src = viewer_base.strip()
# Validate URL scheme before embedding
parsed = urllib.parse.urlparse(final_src)
if final_src and parsed.scheme in ("http", "https"):
safe_src = html.escape(final_src, quote=True)
st.info(f"Viewing via Remote Browser: `{final_src}`")
st.markdown(
f"""
<iframe src="{safe_src}" width="100%" height="{iframe_h}px"
style="border: 2px solid #666; border-radius: 8px; box-shadow: 0 4px 6px rgba(0,0,0,0.3);">
</iframe>
""",
unsafe_allow_html=True
)
else:
st.warning("No valid viewer URL configured. Set one in Monitor Settings below.")
else:
st.info("Live Preview is disabled.")
st.markdown("---")
# --- 3. LATEST OUTPUT ---
if st.session_state.get(f"force_img_refresh_{index}", False):
st.caption("🖼️ Most Recent Output")
try:
hist_res = requests.get(f"{COMFY_URL}/history", timeout=2)
history = hist_res.json()
if history:
last_prompt_id = list(history.keys())[-1]
outputs = history[last_prompt_id].get("outputs", {})
found_img = None
for node_id, node_output in outputs.items():
if "images" in node_output:
for img_info in node_output["images"]:
if img_info["type"] == "output":
found_img = img_info
break
if found_img: break
if found_img:
img_name = found_img['filename']
folder = found_img['subfolder']
img_type = found_img['type']
img_url = f"{COMFY_URL}/view?filename={img_name}&subfolder={folder}&type={img_type}"
img_res = requests.get(img_url)
image = Image.open(BytesIO(img_res.content))
st.image(image, caption=f"Last Output: {img_name}")
else:
st.warning("Last run had no image output.")
else:
st.info("No history found.")
st.session_state[f"force_img_refresh_{index}"] = False
except Exception as e:
st.error(f"Error fetching image: {e}")
# Check for fragment support (Streamlit 1.37+)
if hasattr(st, "fragment"):
# This decorator ensures this function re-runs every 10 seconds automatically
# allowing it to catch the timeout even if you are away from the keyboard.
@st.fragment(run_every=300)
def _monitor_fragment():
_render_content()
else:
# Fallback for older Streamlit versions (Won't auto-refresh while idle)
def _monitor_fragment():
_render_content()
def _render_content():
# --- GLOBAL SETTINGS FOR MONITOR ---
with st.expander("🔧 Monitor Settings", expanded=False):
c_set1, c_set2 = st.columns(2)
current_viewer = st.session_state.config.get("viewer_url", "")
new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://localhost:5800")
# New Timeout Slider
current_timeout = st.session_state.config.get("monitor_timeout", 0)
new_timeout = c_set2.slider("Live Preview Timeout (Minutes)", 0, 60, value=current_timeout, help="0 = Always On. Sets how long the preview stays open before auto-closing.")
if st.button("💾 Save Monitor Settings"):
st.session_state.config["viewer_url"] = new_viewer
st.session_state.config["monitor_timeout"] = new_timeout
save_config(
st.session_state.current_dir,
st.session_state.config['favorites'],
st.session_state.config
)
st.success("Settings saved!")
st.rerun()
# --- INSTANCE MANAGEMENT ---
if "comfy_instances" not in st.session_state.config:
st.session_state.config["comfy_instances"] = [
{"name": "Main Server", "url": "http://192.168.1.100:8188"}
]
instances = st.session_state.config["comfy_instances"]
tab_names = [i["name"] for i in instances] + [" Add Server"]
tabs = st.tabs(tab_names)
timeout_val = st.session_state.config.get("monitor_timeout", 0)
for i, tab in enumerate(tabs[:-1]):
with tab:
render_single_instance(instances[i], i, instances, timeout_val)
with tabs[-1]:
st.header("Add New ComfyUI Instance")
with st.form("add_server_form"):
new_name = st.text_input("Server Name", placeholder="e.g. Render Node 2")
new_url = st.text_input("URL", placeholder="http://192.168.1.50:8188")
if st.form_submit_button("Add Instance"):
if new_name and new_url:
instances.append({"name": new_name, "url": new_url})
st.session_state.config["comfy_instances"] = instances
save_config(
st.session_state.current_dir,
st.session_state.config['favorites'],
st.session_state.config
)
st.success("Server Added!")
st.rerun()
else:
st.error("Please fill in both Name and URL.")
def render_comfy_monitor():
# We call the wrapper which decides if it's a fragment or not
_monitor_fragment()

View File

@@ -1,78 +0,0 @@
import streamlit as st
import json
import copy
from utils import save_json, get_file_mtime, KEY_HISTORY_TREE, KEY_PROMPT_HISTORY
def render_raw_editor(data, file_path):
st.subheader(f"💻 Raw Editor: {file_path.name}")
# Toggle to hide massive history objects
# This is crucial because history trees can get huge and make the text area laggy.
col_ctrl, col_info = st.columns([1, 2])
with col_ctrl:
hide_history = st.checkbox(
"Hide History (Safe Mode)",
value=True,
help="Hides 'history_tree' and 'prompt_history' to keep the editor fast and prevent accidental deletion of version control."
)
# Prepare display data
if hide_history:
display_data = copy.deepcopy(data)
# Safely remove heavy keys for the view only
if KEY_HISTORY_TREE in display_data: del display_data[KEY_HISTORY_TREE]
if KEY_PROMPT_HISTORY in display_data: del display_data[KEY_PROMPT_HISTORY]
else:
display_data = data
# Convert to string
# ensure_ascii=False ensures emojis and special chars render correctly
try:
json_str = json.dumps(display_data, indent=4, ensure_ascii=False)
except Exception as e:
st.error(f"Error serializing JSON: {e}")
json_str = "{}"
# The Text Editor
# We use ui_reset_token in the key to force the text area to reload content on save
new_json_str = st.text_area(
"JSON Content",
value=json_str,
height=650,
key=f"raw_edit_{file_path.name}_{st.session_state.ui_reset_token}"
)
st.markdown("---")
if st.button("💾 Save Raw Changes", type="primary", use_container_width=True):
try:
# 1. Parse the text back to JSON
input_data = json.loads(new_json_str)
# 2. If we were in Safe Mode, we must merge the hidden history back in
if hide_history:
if KEY_HISTORY_TREE in data:
input_data[KEY_HISTORY_TREE] = data[KEY_HISTORY_TREE]
if KEY_PROMPT_HISTORY in data:
input_data[KEY_PROMPT_HISTORY] = data[KEY_PROMPT_HISTORY]
# 3. Save to Disk
save_json(file_path, input_data)
# 4. Update Session State
# We clear and update the existing dictionary object so other tabs see the changes
data.clear()
data.update(input_data)
# 5. Update Metadata to prevent conflict warnings
st.session_state.last_mtime = get_file_mtime(file_path)
st.session_state.ui_reset_token += 1
st.toast("Raw JSON Saved Successfully!", icon="")
st.rerun()
except json.JSONDecodeError as e:
st.error(f"❌ Invalid JSON Syntax: {e}")
st.error("Please fix the formatting errors above before saving.")
except Exception as e:
st.error(f"❌ Unexpected Error: {e}")

View File

@@ -1,390 +0,0 @@
import streamlit as st
import copy
import time
from history_tree import HistoryTree
from utils import save_json, KEY_BATCH_DATA, KEY_HISTORY_TREE
try:
from streamlit_agraph import agraph, Node, Edge, Config
AGRAPH_AVAILABLE = True
except ImportError:
AGRAPH_AVAILABLE = False
def render_timeline_tab(data, file_path):
tree_data = data.get(KEY_HISTORY_TREE, {})
if not tree_data:
st.info("No history timeline exists. Make some changes in the Editor first!")
return
htree = HistoryTree(tree_data)
# --- Initialize selection state ---
if "timeline_selected_nodes" not in st.session_state:
st.session_state.timeline_selected_nodes = set()
if 'restored_indicator' in st.session_state and st.session_state.restored_indicator:
st.info(f"📍 Editing Restored Version: **{st.session_state.restored_indicator}**")
# --- VIEW SWITCHER + SELECTION MODE ---
c_title, c_view, c_toggle = st.columns([2, 1, 0.6])
c_title.subheader("🕰️ Version History")
view_mode = c_view.radio(
"View Mode",
["🌳 Horizontal", "🌲 Vertical", "📜 Linear Log"],
horizontal=True,
label_visibility="collapsed"
)
selection_mode = c_toggle.toggle("Select to Delete", key="timeline_selection_mode")
if not selection_mode:
st.session_state.timeline_selected_nodes = set()
# --- Build sorted node list (shared by all views) ---
all_nodes = list(htree.nodes.values())
all_nodes.sort(key=lambda x: x["timestamp"], reverse=True)
# --- MULTISELECT PICKER (shown when selection mode is on) ---
if selection_mode:
def _fmt_node_option(nid):
n = htree.nodes[nid]
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
note = n.get('note', 'Step')
head = " (HEAD)" if nid == htree.head_id else ""
return f"{note}{ts} ({nid[:6]}){head}"
all_ids = [n["id"] for n in all_nodes]
current_selection = [nid for nid in all_ids if nid in st.session_state.timeline_selected_nodes]
picked = st.multiselect(
"Select nodes to delete:",
options=all_ids,
default=current_selection,
format_func=_fmt_node_option,
)
st.session_state.timeline_selected_nodes = set(picked)
c_all, c_none, _ = st.columns([1, 1, 4])
if c_all.button("Select All", use_container_width=True):
st.session_state.timeline_selected_nodes = set(all_ids)
st.rerun()
if c_none.button("Deselect All", use_container_width=True):
st.session_state.timeline_selected_nodes = set()
st.rerun()
# --- RENDER GRAPH VIEWS ---
if view_mode in ["🌳 Horizontal", "🌲 Vertical"]:
direction = "LR" if view_mode == "🌳 Horizontal" else "TB"
if AGRAPH_AVAILABLE:
# Interactive graph with streamlit-agraph
selected_set = st.session_state.timeline_selected_nodes if selection_mode else set()
clicked_node = _render_interactive_graph(htree, direction, selected_set)
if clicked_node and clicked_node in htree.nodes:
if selection_mode:
# Toggle node in selection set
if clicked_node in st.session_state.timeline_selected_nodes:
st.session_state.timeline_selected_nodes.discard(clicked_node)
else:
st.session_state.timeline_selected_nodes.add(clicked_node)
st.rerun()
else:
node = htree.nodes[clicked_node]
if clicked_node != htree.head_id:
_restore_node(data, node, htree, file_path)
else:
# Fallback to static graphviz
try:
graph_dot = htree.generate_graph(direction=direction)
if direction == "LR":
st.graphviz_chart(graph_dot, use_container_width=True)
else:
_, col_center, _ = st.columns([1, 2, 1])
with col_center:
st.graphviz_chart(graph_dot, use_container_width=True)
except Exception as e:
st.error(f"Graph Error: {e}")
st.caption("💡 Install `streamlit-agraph` for interactive click-to-restore")
# --- RENDER LINEAR LOG VIEW ---
elif view_mode == "📜 Linear Log":
st.caption("A simple chronological list of all snapshots.")
for n in all_nodes:
is_head = (n["id"] == htree.head_id)
with st.container():
if selection_mode:
c0, c1, c2, c3 = st.columns([0.3, 0.5, 4, 1])
with c0:
is_selected = n["id"] in st.session_state.timeline_selected_nodes
if st.checkbox("", value=is_selected, key=f"log_sel_{n['id']}", label_visibility="collapsed"):
st.session_state.timeline_selected_nodes.add(n["id"])
else:
st.session_state.timeline_selected_nodes.discard(n["id"])
else:
c1, c2, c3 = st.columns([0.5, 4, 1])
with c1:
st.markdown("### 📍" if is_head else "### ⚫")
with c2:
note_txt = n.get('note', 'Step')
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
if is_head:
st.markdown(f"**{note_txt}** (Current)")
else:
st.write(f"**{note_txt}**")
st.caption(f"ID: {n['id'][:6]}{ts}")
with c3:
if not is_head and not selection_mode:
if st.button("", key=f"log_rst_{n['id']}", help="Restore this version"):
_restore_node(data, n, htree, file_path)
st.divider()
# --- BATCH DELETE UI ---
if selection_mode and st.session_state.timeline_selected_nodes:
# Prune any selected IDs that no longer exist in the tree
valid_selected = st.session_state.timeline_selected_nodes & set(htree.nodes.keys())
st.session_state.timeline_selected_nodes = valid_selected
count = len(valid_selected)
if count > 0:
st.warning(f"**{count}** node{'s' if count != 1 else ''} selected for deletion.")
if st.button(f"🗑️ Delete {count} Node{'s' if count != 1 else ''}", type="primary"):
# Backup
if "history_tree_backup" not in data:
data["history_tree_backup"] = []
data["history_tree_backup"].append(copy.deepcopy(htree.to_dict()))
# Delete all selected nodes
for nid in valid_selected:
if nid in htree.nodes:
del htree.nodes[nid]
# Clean up branch tips
for b, tip in list(htree.branches.items()):
if tip in valid_selected:
del htree.branches[b]
# Reassign HEAD if deleted
if htree.head_id in valid_selected:
if htree.nodes:
fallback = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])[-1]
htree.head_id = fallback["id"]
else:
htree.head_id = None
# Save and reset
data[KEY_HISTORY_TREE] = htree.to_dict()
save_json(file_path, data)
st.session_state.timeline_selected_nodes = set()
st.toast(f"Deleted {count} node{'s' if count != 1 else ''}!", icon="🗑️")
st.rerun()
st.markdown("---")
# --- NODE SELECTOR ---
col_sel, col_act = st.columns([3, 1])
def fmt_node(n):
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
return f"{n.get('note', 'Step')}{ts} ({n['id'][:6]})"
with col_sel:
current_idx = 0
for i, n in enumerate(all_nodes):
if n["id"] == htree.head_id:
current_idx = i
break
selected_node = st.selectbox(
"Select Version to Manage:",
all_nodes,
format_func=fmt_node,
index=current_idx
)
if not selected_node:
return
node_data = selected_node["data"]
# --- RESTORE ---
with col_act:
st.write(""); st.write("")
if st.button("⏪ Restore Version", type="primary", use_container_width=True):
_restore_node(data, selected_node, htree, file_path)
# --- RENAME ---
rn_col1, rn_col2 = st.columns([3, 1])
new_label = rn_col1.text_input("Rename Label", value=selected_node.get("note", ""))
if rn_col2.button("Update Label"):
selected_node["note"] = new_label
data[KEY_HISTORY_TREE] = htree.to_dict()
save_json(file_path, data)
st.rerun()
# --- DANGER ZONE ---
st.markdown("---")
with st.expander("⚠️ Danger Zone (Delete)"):
st.warning("Deleting a node cannot be undone.")
if st.button("🗑️ Delete This Node", type="primary"):
if selected_node['id'] in htree.nodes:
if "history_tree_backup" not in data:
data["history_tree_backup"] = []
data["history_tree_backup"].append(copy.deepcopy(htree.to_dict()))
del htree.nodes[selected_node['id']]
for b, tip in list(htree.branches.items()):
if tip == selected_node['id']:
del htree.branches[b]
if htree.head_id == selected_node['id']:
if htree.nodes:
fallback = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])[-1]
htree.head_id = fallback["id"]
else:
htree.head_id = None
data[KEY_HISTORY_TREE] = htree.to_dict()
save_json(file_path, data)
st.toast("Node Deleted", icon="🗑️")
st.rerun()
# --- DATA PREVIEW ---
st.markdown("---")
with st.expander("🔍 Data Preview", expanded=False):
batch_list = node_data.get(KEY_BATCH_DATA, [])
if batch_list and isinstance(batch_list, list) and len(batch_list) > 0:
st.info(f"📚 This snapshot contains {len(batch_list)} sequences.")
for i, seq_data in enumerate(batch_list):
seq_num = seq_data.get("sequence_number", i + 1)
with st.expander(f"🎬 Sequence #{seq_num}", expanded=(i == 0)):
prefix = f"p_{selected_node['id']}_s{i}"
_render_preview_fields(seq_data, prefix)
else:
prefix = f"p_{selected_node['id']}_single"
_render_preview_fields(node_data, prefix)
def _render_interactive_graph(htree, direction, selected_nodes=None):
"""Render an interactive graph using streamlit-agraph. Returns clicked node id."""
if selected_nodes is None:
selected_nodes = set()
# Build reverse lookup: branch tip -> branch name(s)
tip_to_branches = {}
for b_name, tip_id in htree.branches.items():
if tip_id:
tip_to_branches.setdefault(tip_id, []).append(b_name)
sorted_nodes_list = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])
nodes = []
edges = []
for n in sorted_nodes_list:
nid = n["id"]
full_note = n.get('note', 'Step')
display_note = (full_note[:20] + '..') if len(full_note) > 20 else full_note
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
# Branch label
branch_label = ""
if nid in tip_to_branches:
branch_label = f"\n[{', '.join(tip_to_branches[nid])}]"
label = f"{display_note}\n{ts}{branch_label}"
# Colors - selected nodes override to red
if nid in selected_nodes:
color = "#ff5555" # Selected for deletion - red
elif nid == htree.head_id:
color = "#ffdd44" # Current head - bright yellow
elif nid in htree.branches.values():
color = "#66dd66" # Branch tip - bright green
else:
color = "#aaccff" # Normal - light blue
nodes.append(Node(
id=nid,
label=label,
size=20,
color=color,
font={"size": 10, "color": "#ffffff"}
))
if n["parent"] and n["parent"] in htree.nodes:
edges.append(Edge(source=n["parent"], target=nid, color="#888888"))
# Config based on direction
is_horizontal = direction == "LR"
config = Config(
width="100%",
height=400 if is_horizontal else 600,
directed=True,
hierarchical=True,
physics=False,
nodeHighlightBehavior=True,
highlightColor="#ffcc00",
collapsible=False,
layout={
"hierarchical": {
"enabled": True,
"direction": "LR" if is_horizontal else "UD",
"sortMethod": "directed",
"levelSeparation": 150 if is_horizontal else 80,
"nodeSpacing": 100 if is_horizontal else 60,
}
}
)
return agraph(nodes=nodes, edges=edges, config=config)
def _restore_node(data, node, htree, file_path):
"""Restore a history node as the current version."""
node_data = node["data"]
if KEY_BATCH_DATA not in node_data and KEY_BATCH_DATA in data:
del data[KEY_BATCH_DATA]
data.update(node_data)
htree.head_id = node['id']
data[KEY_HISTORY_TREE] = htree.to_dict()
save_json(file_path, data)
st.session_state.ui_reset_token += 1
label = f"{node.get('note')} ({node['id'][:4]})"
st.session_state.restored_indicator = label
st.toast("Restored!", icon="🔄")
st.rerun()
def _render_preview_fields(item_data, prefix):
"""Render a read-only preview of prompts, settings, and LoRAs."""
# Prompts
p_col1, p_col2 = st.columns(2)
with p_col1:
st.text_area("General Positive", value=item_data.get("general_prompt", ""), height=80, disabled=True, key=f"{prefix}_gp")
val_sp = item_data.get("current_prompt", "") or item_data.get("prompt", "")
st.text_area("Specific Positive", value=val_sp, height=80, disabled=True, key=f"{prefix}_sp")
with p_col2:
st.text_area("General Negative", value=item_data.get("general_negative", ""), height=80, disabled=True, key=f"{prefix}_gn")
st.text_area("Specific Negative", value=item_data.get("negative", ""), height=80, disabled=True, key=f"{prefix}_sn")
# Settings
s_col1, s_col2, s_col3 = st.columns(3)
s_col1.text_input("Camera", value=str(item_data.get("camera", "static")), disabled=True, key=f"{prefix}_cam")
s_col2.text_input("FLF", value=str(item_data.get("flf", "0.0")), disabled=True, key=f"{prefix}_flf")
s_col3.text_input("Seed", value=str(item_data.get("seed", "-1")), disabled=True, key=f"{prefix}_seed")
# LoRAs
with st.expander("💊 LoRA Configuration", expanded=False):
l1, l2, l3 = st.columns(3)
with l1:
st.text_input("L1 Name", value=item_data.get("lora 1 high", ""), disabled=True, key=f"{prefix}_l1h")
st.text_input("L1 Str", value=str(item_data.get("lora 1 low", "")), disabled=True, key=f"{prefix}_l1l")
with l2:
st.text_input("L2 Name", value=item_data.get("lora 2 high", ""), disabled=True, key=f"{prefix}_l2h")
st.text_input("L2 Str", value=str(item_data.get("lora 2 low", "")), disabled=True, key=f"{prefix}_l2l")
with l3:
st.text_input("L3 Name", value=item_data.get("lora 3 high", ""), disabled=True, key=f"{prefix}_l3h")
st.text_input("L3 Str", value=str(item_data.get("lora 3 low", "")), disabled=True, key=f"{prefix}_l3l")
# VACE
vace_keys = ["frame_to_skip", "vace schedule", "video file path"]
if any(k in item_data for k in vace_keys):
with st.expander("🎞️ VACE / I2V Settings", expanded=False):
v1, v2, v3 = st.columns(3)
v1.text_input("Skip Frames", value=str(item_data.get("frame_to_skip", "")), disabled=True, key=f"{prefix}_fts")
v2.text_input("Schedule", value=str(item_data.get("vace schedule", "")), disabled=True, key=f"{prefix}_vsc")
v3.text_input("Video Path", value=str(item_data.get("video file path", "")), disabled=True, key=f"{prefix}_vid")