From 9a3f7b7b94bb6b92cee63a7755e8a770592e7272 Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Fri, 27 Feb 2026 22:17:38 +0100 Subject: [PATCH] Remove old Streamlit UI files superseded by NiceGUI migration Co-Authored-By: Claude Opus 4.6 --- app.py | 224 ------------------ tab_batch.py | 594 ------------------------------------------------ tab_comfy.py | 249 -------------------- tab_raw.py | 78 ------- tab_timeline.py | 390 ------------------------------- 5 files changed, 1535 deletions(-) delete mode 100644 app.py delete mode 100644 tab_batch.py delete mode 100644 tab_comfy.py delete mode 100644 tab_raw.py delete mode 100644 tab_timeline.py diff --git a/app.py b/app.py deleted file mode 100644 index 60cf05a..0000000 --- a/app.py +++ /dev/null @@ -1,224 +0,0 @@ -import streamlit as st -from pathlib import Path - -# --- Import Custom Modules --- -from utils import ( - load_config, save_config, load_snippets, save_snippets, - load_json, save_json, generate_templates, DEFAULTS, - KEY_BATCH_DATA, KEY_SEQUENCE_NUMBER, - resolve_path_case_insensitive, -) -from tab_batch import render_batch_processor -from tab_timeline import render_timeline_tab -from tab_comfy import render_comfy_monitor -from tab_raw import render_raw_editor - -# ========================================== -# 1. PAGE CONFIGURATION -# ========================================== -st.set_page_config(layout="wide", page_title="AI Settings Manager") - -# ========================================== -# 2. SESSION STATE INITIALIZATION -# ========================================== -_SESSION_DEFAULTS = { - "snippets": load_snippets, - "loaded_file": lambda: None, - "last_mtime": lambda: 0, - "ui_reset_token": lambda: 0, - "active_tab_name": lambda: "🚀 Batch Processor", -} - -if 'config' not in st.session_state: - st.session_state.config = load_config() - st.session_state.current_dir = Path(st.session_state.config.get("last_dir", Path.cwd())) - -for key, factory in _SESSION_DEFAULTS.items(): - if key not in st.session_state: - st.session_state[key] = factory() - -# ========================================== -# 3. SIDEBAR (NAVIGATOR & TOOLS) -# ========================================== -with st.sidebar: - st.header("📂 Navigator") - - # --- Path Navigator --- - # Sync widget to current_dir on first load or after external change - if "nav_path_input" not in st.session_state or st.session_state.get("_sync_nav_path"): - st.session_state.nav_path_input = str(st.session_state.current_dir) - st.session_state._sync_nav_path = False - - def _on_path_change(): - new_path = st.session_state.nav_path_input - p = resolve_path_case_insensitive(new_path) - if p is not None and p.is_dir(): - st.session_state.current_dir = p - st.session_state.config['last_dir'] = str(p) - save_config(st.session_state.current_dir, st.session_state.config['favorites']) - st.session_state.loaded_file = None - # Always resync widget to canonical path form - st.session_state._sync_nav_path = True - - st.text_input("Current Path", key="nav_path_input", on_change=_on_path_change) - - # --- Favorites System --- - if st.button("📌 Pin Folder", use_container_width=True): - if str(st.session_state.current_dir) not in st.session_state.config['favorites']: - st.session_state.config['favorites'].append(str(st.session_state.current_dir)) - save_config(st.session_state.current_dir, st.session_state.config['favorites']) - st.rerun() - - favorites = st.session_state.config['favorites'] - if favorites: - def _on_fav_jump(): - sel = st.session_state._fav_radio - if sel != "Select..." and sel != str(st.session_state.current_dir): - st.session_state.current_dir = Path(sel) - st.session_state._sync_nav_path = True - - st.radio( - "Jump to:", - ["Select..."] + favorites, - index=0, - key="_fav_radio", - label_visibility="collapsed", - on_change=_on_fav_jump, - ) - - # Unpin buttons for each favorite - for fav in favorites: - fc1, fc2 = st.columns([4, 1]) - fc1.caption(fav) - if fc2.button("❌", key=f"unpin_{fav}"): - st.session_state.config['favorites'].remove(fav) - save_config(st.session_state.current_dir, st.session_state.config['favorites']) - st.rerun() - - st.markdown("---") - - # --- Snippet Library --- - st.subheader("🧩 Snippet Library") - with st.expander("Add New Snippet"): - snip_name = st.text_input("Name", placeholder="e.g. Cinematic") - snip_content = st.text_area("Content", placeholder="4k, high quality...") - if st.button("Save Snippet"): - if snip_name and snip_content: - st.session_state.snippets[snip_name] = snip_content - save_snippets(st.session_state.snippets) - st.success(f"Saved '{snip_name}'") - st.rerun() - - if st.session_state.snippets: - st.caption("Click to Append to Prompt:") - for name, content in st.session_state.snippets.items(): - col_s1, col_s2 = st.columns([4, 1]) - if col_s1.button(f"➕ {name}", use_container_width=True): - st.rerun() - if col_s2.button("đŸ—‘ī¸", key=f"del_snip_{name}"): - del st.session_state.snippets[name] - save_snippets(st.session_state.snippets) - st.rerun() - - st.markdown("---") - - # --- File List & Creation --- - json_files = sorted(list(st.session_state.current_dir.glob("*.json"))) - json_files = [f for f in json_files if f.name != ".editor_config.json" and f.name != ".editor_snippets.json"] - - if not json_files: - if st.button("Generate Templates"): - generate_templates(st.session_state.current_dir) - st.rerun() - - with st.expander("Create New JSON"): - new_filename = st.text_input("Filename", placeholder="my_prompt_vace") - if st.button("Create"): - if not new_filename.endswith(".json"): new_filename += ".json" - path = st.session_state.current_dir / new_filename - first_item = DEFAULTS.copy() - first_item[KEY_SEQUENCE_NUMBER] = 1 - data = {KEY_BATCH_DATA: [first_item]} - save_json(path, data) - st.rerun() - - # --- File Selector --- - selected_file_name = None - if json_files: - file_names = [f.name for f in json_files] - if 'file_selector' not in st.session_state: - st.session_state.file_selector = file_names[0] - if st.session_state.file_selector not in file_names: - st.session_state.file_selector = file_names[0] - - selected_file_name = st.radio("Select File", file_names, key="file_selector") - else: - st.info("No JSON files in this folder.") - if 'file_selector' in st.session_state: - del st.session_state.file_selector - st.session_state.loaded_file = None - - # --- GLOBAL MONITOR TOGGLE (NEW) --- - st.markdown("---") - show_monitor = st.checkbox("Show Comfy Monitor", value=True) - -# ========================================== -# 4. MAIN APP LOGIC -# ========================================== -if selected_file_name: - file_path = st.session_state.current_dir / selected_file_name - - # --- FILE LOADING & AUTO-SWITCH LOGIC --- - if st.session_state.loaded_file != str(file_path): - data, mtime = load_json(file_path) - st.session_state.data_cache = data - st.session_state.last_mtime = mtime - st.session_state.loaded_file = str(file_path) - - # Clear transient states - if 'restored_indicator' in st.session_state: del st.session_state.restored_indicator - - # --- AUTO-SWITCH TAB LOGIC --- - st.session_state.active_tab_name = "🚀 Batch Processor" - - else: - data = st.session_state.data_cache - - st.title(f"Editing: {selected_file_name}") - - # --- CONTROLLED NAVIGATION --- - # Removed "🔌 Comfy Monitor" from this list - tabs_list = [ - "🚀 Batch Processor", - "🕒 Timeline", - "đŸ’ģ Raw Editor" - ] - - if st.session_state.active_tab_name not in tabs_list: - st.session_state.active_tab_name = tabs_list[0] - - current_tab = st.radio( - "Navigation", - tabs_list, - key="active_tab_name", - horizontal=True, - label_visibility="collapsed" - ) - - st.markdown("---") - - # --- RENDER EDITOR TABS --- - if current_tab == "🚀 Batch Processor": - render_batch_processor(data, file_path, json_files, st.session_state.current_dir, selected_file_name) - - elif current_tab == "🕒 Timeline": - render_timeline_tab(data, file_path) - - elif current_tab == "đŸ’ģ Raw Editor": - render_raw_editor(data, file_path) - - # --- GLOBAL PERSISTENT MONITOR --- - if show_monitor: - st.markdown("---") - with st.expander("🔌 ComfyUI Monitor", expanded=True): - render_comfy_monitor() \ No newline at end of file diff --git a/tab_batch.py b/tab_batch.py deleted file mode 100644 index ed5628e..0000000 --- a/tab_batch.py +++ /dev/null @@ -1,594 +0,0 @@ -import streamlit as st -import random -import copy -from pathlib import Path -from utils import DEFAULTS, save_json, load_json, KEY_BATCH_DATA, KEY_HISTORY_TREE, KEY_PROMPT_HISTORY, KEY_SEQUENCE_NUMBER -from history_tree import HistoryTree - -IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"} - -SUB_SEGMENT_MULTIPLIER = 1000 - -def is_subsegment(seq_num): - """Return True if seq_num is a sub-segment (>= 1000).""" - return int(seq_num) >= SUB_SEGMENT_MULTIPLIER - -def parent_of(seq_num): - """Return the parent segment number (or self if already a parent).""" - seq_num = int(seq_num) - return seq_num // SUB_SEGMENT_MULTIPLIER if is_subsegment(seq_num) else seq_num - -def sub_index_of(seq_num): - """Return the sub-index (0 if parent).""" - seq_num = int(seq_num) - return seq_num % SUB_SEGMENT_MULTIPLIER if is_subsegment(seq_num) else 0 - -def format_seq_label(seq_num): - """Return display label: 'Sequence #3' or 'Sub #2.1'.""" - seq_num = int(seq_num) - if is_subsegment(seq_num): - return f"Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)}" - return f"Sequence #{seq_num}" - -def next_sub_segment_number(batch_list, parent_seq_num): - """Find the next available sub-segment number under a parent.""" - parent_seq_num = int(parent_seq_num) - max_sub = 0 - for s in batch_list: - sn = int(s.get(KEY_SEQUENCE_NUMBER, 0)) - if is_subsegment(sn) and parent_of(sn) == parent_seq_num: - max_sub = max(max_sub, sub_index_of(sn)) - return parent_seq_num * SUB_SEGMENT_MULTIPLIER + max_sub + 1 - -def find_insert_position(batch_list, parent_index, parent_seq_num): - """Find the insert position after the parent's last existing sub-segment.""" - parent_seq_num = int(parent_seq_num) - pos = parent_index + 1 - while pos < len(batch_list): - sn = int(batch_list[pos].get(KEY_SEQUENCE_NUMBER, 0)) - if is_subsegment(sn) and parent_of(sn) == parent_seq_num: - pos += 1 - else: - break - return pos - -def _render_mass_update(batch_list, data, file_path, key_prefix): - """Render the mass update UI section.""" - with st.expander("🔄 Mass Update", expanded=False): - if len(batch_list) < 2: - st.info("Need at least 2 sequences for mass update.") - return - - # Source sequence selector - source_idx = st.selectbox( - "Copy from sequence:", - range(len(batch_list)), - format_func=lambda i: format_seq_label(batch_list[i].get('sequence_number', i+1)), - key=f"{key_prefix}_mass_src" - ) - source_seq = batch_list[source_idx] - - # Field multi-select (exclude sequence_number) - available_keys = [k for k in source_seq.keys() if k != "sequence_number"] - selected_keys = st.multiselect("Fields to copy:", available_keys, key=f"{key_prefix}_mass_fields") - - if not selected_keys: - return - - # Target sequence checkboxes - st.write("Apply to:") - select_all = st.checkbox("Select All", key=f"{key_prefix}_mass_all") - - target_indices = [] - target_cols = st.columns(min(4, len(batch_list) - 1)) if len(batch_list) > 1 else [st] - col_idx = 0 - for i, seq in enumerate(batch_list): - if i == source_idx: - continue - seq_num = seq.get("sequence_number", i + 1) - with target_cols[col_idx % len(target_cols)]: - checked = select_all or st.checkbox(format_seq_label(seq_num), key=f"{key_prefix}_mass_t{i}") - if checked: - target_indices.append(i) - col_idx += 1 - - # Preview - if target_indices and selected_keys: - with st.expander("Preview changes", expanded=True): - for key in selected_keys: - val = source_seq.get(key, "") - display_val = str(val)[:100] + "..." if len(str(val)) > 100 else str(val) - st.caption(f"**{key}**: {display_val}") - - # Apply button - if st.button("Apply Changes", type="primary", key=f"{key_prefix}_mass_apply"): - for i in target_indices: - for key in selected_keys: - batch_list[i][key] = copy.deepcopy(source_seq.get(key)) - - # Save with history snapshot - data[KEY_BATCH_DATA] = batch_list - htree = HistoryTree(data.get(KEY_HISTORY_TREE, {})) - snapshot_payload = copy.deepcopy(data) - if KEY_HISTORY_TREE in snapshot_payload: - del snapshot_payload[KEY_HISTORY_TREE] - htree.commit(snapshot_payload, f"Mass update: {', '.join(selected_keys)}") - data[KEY_HISTORY_TREE] = htree.to_dict() - save_json(file_path, data) - st.session_state.data_cache = data - st.session_state.ui_reset_token += 1 - st.toast(f"Updated {len(target_indices)} sequences", icon="✅") - st.rerun() - - -def create_batch_callback(original_filename, current_data, current_dir): - new_name = f"batch_{original_filename}" - new_path = current_dir / new_name - - if new_path.exists(): - st.toast(f"File {new_name} already exists!", icon="âš ī¸") - return - - first_item = current_data.copy() - if KEY_PROMPT_HISTORY in first_item: del first_item[KEY_PROMPT_HISTORY] - if KEY_HISTORY_TREE in first_item: del first_item[KEY_HISTORY_TREE] - - first_item[KEY_SEQUENCE_NUMBER] = 1 - - new_data = { - KEY_BATCH_DATA: [first_item], - KEY_HISTORY_TREE: {}, - KEY_PROMPT_HISTORY: [] - } - - save_json(new_path, new_data) - st.toast(f"Created {new_name}", icon="✨") - st.session_state.file_selector = new_name - - -def render_batch_processor(data, file_path, json_files, current_dir, selected_file_name): - is_batch_file = KEY_BATCH_DATA in data or isinstance(data, list) - - if not is_batch_file: - st.warning("This is a Single file. To use Batch mode, create a copy.") - st.button("✨ Create Batch Copy", on_click=create_batch_callback, args=(selected_file_name, data, current_dir)) - return - - if 'restored_indicator' in st.session_state and st.session_state.restored_indicator: - st.info(f"📍 Editing Restored Version: **{st.session_state.restored_indicator}**") - - batch_list = data.get(KEY_BATCH_DATA, []) - - # --- ADD NEW SEQUENCE AREA --- - st.subheader("Add New Sequence") - ac1, ac2 = st.columns(2) - - with ac1: - file_options = [f.name for f in json_files] - d_idx = file_options.index(selected_file_name) if selected_file_name in file_options else 0 - src_name = st.selectbox("Source File:", file_options, index=d_idx, key="batch_src_file") - src_data, _ = load_json(current_dir / src_name) - - with ac2: - src_batch = src_data.get(KEY_BATCH_DATA, []) - if src_batch: - seq_opts = list(range(len(src_batch))) - sel_seq_idx = st.selectbox( - "Source Sequence:", - seq_opts, - format_func=lambda i: format_seq_label(src_batch[i].get(KEY_SEQUENCE_NUMBER, i + 1)), - key="batch_src_seq" - ) - else: - st.caption("Single file (no sequences)") - sel_seq_idx = None - - bc1, bc2 = st.columns(2) - - def add_sequence(new_item): - max_seq = 0 - for s in batch_list: - sn = int(s.get(KEY_SEQUENCE_NUMBER, 0)) - if not is_subsegment(sn): - max_seq = max(max_seq, sn) - new_item[KEY_SEQUENCE_NUMBER] = max_seq + 1 - - for k in [KEY_PROMPT_HISTORY, KEY_HISTORY_TREE, "note", "loras"]: - if k in new_item: del new_item[k] - - batch_list.append(new_item) - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.rerun() - - if bc1.button("➕ Add Empty", use_container_width=True): - add_sequence(DEFAULTS.copy()) - - if bc2.button("➕ From Source", use_container_width=True, help=f"Import from {src_name}"): - item = DEFAULTS.copy() - if src_batch and sel_seq_idx is not None: - item.update(src_batch[sel_seq_idx]) - else: - item.update(src_data) - add_sequence(item) - - # --- RENDER LIST --- - st.markdown("---") - info_col, reorder_col = st.columns([3, 1]) - info_col.info(f"Batch contains {len(batch_list)} sequences.") - if reorder_col.button("đŸ”ĸ Sort by Number", use_container_width=True, help="Reorder sequences by sequence number"): - batch_list.sort(key=lambda s: int(s.get(KEY_SEQUENCE_NUMBER, 0))) - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.toast("Sorted by sequence number!", icon="đŸ”ĸ") - st.rerun() - - # --- MASS UPDATE SECTION --- - ui_reset_token = st.session_state.get("ui_reset_token", 0) - _render_mass_update(batch_list, data, file_path, f"{selected_file_name}_v{ui_reset_token}") - - # Updated LoRA keys to match new logic - lora_keys = ["lora 1 high", "lora 1 low", "lora 2 high", "lora 2 low", "lora 3 high", "lora 3 low"] - standard_keys = { - "general_prompt", "general_negative", "current_prompt", "negative", "prompt", "seed", "cfg", - "camera", "flf", KEY_SEQUENCE_NUMBER - } - standard_keys.update(lora_keys) - standard_keys.update([ - "frame_to_skip", "end_frame", "transition", "vace_length", - "input_a_frames", "input_b_frames", "reference switch", "vace schedule", - "reference path", "video file path", "reference image path", "flf image path" - ]) - - VACE_MODES = [ - "End Extend", "Pre Extend", "Middle Extend", "Edge Extend", - "Join Extend", "Bidirectional Extend", "Frame Interpolation", - "Replace/Inpaint", "Video Inpaint", "Keyframe", - ] - VACE_FORMULAS = [ - "base + A", # 0 End Extend - "base + B", # 1 Pre Extend - "base + A + B", # 2 Middle Extend - "base + A + B", # 3 Edge Extend - "base + A + B", # 4 Join Extend - "base + A + B", # 5 Bidirectional - "(B-1) * step", # 6 Frame Interpolation - "snap(source)", # 7 Replace/Inpaint - "snap(source)", # 8 Video Inpaint - "base + A + B", # 9 Keyframe - ] - - for i, seq in enumerate(batch_list): - seq_num = seq.get(KEY_SEQUENCE_NUMBER, i+1) - prefix = f"{selected_file_name}_seq{i}_v{st.session_state.ui_reset_token}" - - if is_subsegment(seq_num): - expander_label = f"🔗 â†ŗ Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)} ({int(seq_num)})" - else: - expander_label = f"đŸŽŦ Sequence #{seq_num}" - - with st.expander(expander_label, expanded=False): - # --- ACTION ROW --- - act_c1, act_c2, act_c3, act_c4 = st.columns([1.2, 1.8, 1.2, 0.5]) - - # 1. Copy Source - with act_c1: - if st.button(f"đŸ“Ĩ Copy {src_name}", key=f"{prefix}_copy", use_container_width=True): - item = DEFAULTS.copy() - if src_batch and sel_seq_idx is not None: - item.update(src_batch[sel_seq_idx]) - else: - item.update(src_data) - item[KEY_SEQUENCE_NUMBER] = seq_num - for k in [KEY_PROMPT_HISTORY, KEY_HISTORY_TREE]: - if k in item: del item[k] - batch_list[i] = item - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.toast("Copied!", icon="đŸ“Ĩ") - st.rerun() - - # 2. Cloning Tools - with act_c2: - cl_1, cl_2, cl_3 = st.columns(3) - if cl_1.button("đŸ‘¯ Next", key=f"{prefix}_c_next", help="Clone and insert below", use_container_width=True): - new_seq = copy.deepcopy(seq) - max_sn = 0 - for s in batch_list: - sn = int(s.get(KEY_SEQUENCE_NUMBER, 0)) - if not is_subsegment(sn): - max_sn = max(max_sn, sn) - new_seq[KEY_SEQUENCE_NUMBER] = max_sn + 1 - if not is_subsegment(seq_num): - insert_pos = find_insert_position(batch_list, i, int(seq_num)) - else: - insert_pos = i + 1 - batch_list.insert(insert_pos, new_seq) - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.toast("Cloned to Next!", icon="đŸ‘¯") - st.rerun() - - if cl_2.button("âŦ End", key=f"{prefix}_c_end", help="Clone and add to bottom", use_container_width=True): - new_seq = copy.deepcopy(seq) - max_sn = 0 - for s in batch_list: - sn = int(s.get(KEY_SEQUENCE_NUMBER, 0)) - if not is_subsegment(sn): - max_sn = max(max_sn, sn) - new_seq[KEY_SEQUENCE_NUMBER] = max_sn + 1 - batch_list.append(new_seq) - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.toast("Cloned to End!", icon="âŦ") - st.rerun() - - if cl_3.button("🔗 Sub", key=f"{prefix}_c_sub", help="Clone as sub-segment", use_container_width=True): - new_seq = copy.deepcopy(seq) - p_seq_num = parent_of(seq_num) - # Find the parent's index in batch_list - p_idx = i - if is_subsegment(seq_num): - for pi, ps in enumerate(batch_list): - if int(ps.get(KEY_SEQUENCE_NUMBER, 0)) == p_seq_num: - p_idx = pi - break - new_seq[KEY_SEQUENCE_NUMBER] = next_sub_segment_number(batch_list, p_seq_num) - insert_pos = find_insert_position(batch_list, p_idx, p_seq_num) - batch_list.insert(insert_pos, new_seq) - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.toast(f"Created {format_seq_label(new_seq[KEY_SEQUENCE_NUMBER])}!", icon="🔗") - st.rerun() - - # 3. Promote - with act_c3: - if st.button("â†–ī¸ Promote", key=f"{prefix}_prom", help="Save as Single File", use_container_width=True): - single_data = seq.copy() - single_data[KEY_PROMPT_HISTORY] = data.get(KEY_PROMPT_HISTORY, []) - single_data[KEY_HISTORY_TREE] = data.get(KEY_HISTORY_TREE, {}) - if KEY_SEQUENCE_NUMBER in single_data: del single_data[KEY_SEQUENCE_NUMBER] - save_json(file_path, single_data) - st.session_state.data_cache = single_data - st.session_state.ui_reset_token += 1 - st.toast("Converted to Single!", icon="✅") - st.rerun() - - # 4. Remove - with act_c4: - if st.button("đŸ—‘ī¸", key=f"{prefix}_del", use_container_width=True): - batch_list.pop(i) - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.rerun() - - st.markdown("---") - c1, c2 = st.columns([2, 1]) - with c1: - seq["general_prompt"] = st.text_area("General Prompt", value=seq.get("general_prompt", ""), height=60, key=f"{prefix}_gp") - seq["general_negative"] = st.text_area("General Negative", value=seq.get("general_negative", ""), height=60, key=f"{prefix}_gn") - seq["current_prompt"] = st.text_area("Specific Prompt", value=seq.get("current_prompt", ""), height=300, key=f"{prefix}_sp") - seq["negative"] = st.text_area("Specific Negative", value=seq.get("negative", ""), height=60, key=f"{prefix}_sn") - - with c2: - sn_label = f"Sequence Number (â†ŗ Sub #{parent_of(seq_num)}.{sub_index_of(seq_num)})" if is_subsegment(seq_num) else "Sequence Number" - seq[KEY_SEQUENCE_NUMBER] = st.number_input(sn_label, value=int(seq_num), key=f"{prefix}_sn_val") - - s_row1, s_row2 = st.columns([3, 1]) - seed_key = f"{prefix}_seed" - with s_row2: - st.write("") - st.write("") - if st.button("🎲", key=f"{prefix}_rand"): - st.session_state[seed_key] = random.randint(0, 999999999999) - st.rerun() - with s_row1: - current_seed = st.session_state.get(seed_key, int(seq.get("seed", 0))) - val = st.number_input("Seed", value=current_seed, key=seed_key) - seq["seed"] = val - - seq["cfg"] = st.number_input("CFG", value=float(seq.get("cfg", DEFAULTS["cfg"])), step=0.5, format="%.1f", key=f"{prefix}_cfg") - seq["camera"] = st.text_input("Camera", value=seq.get("camera", ""), key=f"{prefix}_cam") - seq["flf"] = st.text_input("FLF", value=str(seq.get("flf", DEFAULTS["flf"])), key=f"{prefix}_flf") - - seq["end_frame"] = st.number_input("End Frame", value=int(seq.get("end_frame", 0)), key=f"{prefix}_ef") - seq["video file path"] = st.text_input("Video File Path", value=seq.get("video file path", ""), key=f"{prefix}_vid") - for img_label, img_key, img_suffix in [ - ("Reference Image Path", "reference image path", "rip"), - ("Reference Path", "reference path", "rp"), - ("FLF Image Path", "flf image path", "flfi"), - ]: - img_col, prev_col = st.columns([5, 1]) - seq[img_key] = img_col.text_input(img_label, value=seq.get(img_key, ""), key=f"{prefix}_{img_suffix}") - img_path = Path(seq[img_key]) if seq[img_key] else None - if img_path and img_path.exists() and img_path.suffix.lower() in IMAGE_EXTENSIONS: - with prev_col.popover("👁"): - st.image(str(img_path), use_container_width=True) - with st.expander("VACE Settings"): - fts_col, fts_btn = st.columns([3, 1]) - saved_fts_key = f"{prefix}_fts_saved" - if saved_fts_key not in st.session_state: - st.session_state[saved_fts_key] = int(seq.get("frame_to_skip", 81)) - old_fts = st.session_state[saved_fts_key] - seq["frame_to_skip"] = fts_col.number_input("Frame to Skip", value=old_fts, key=f"{prefix}_fts") - delta = int(seq["frame_to_skip"]) - old_fts - delta_label = f"Shift ↓ ({delta:+d})" if delta != 0 else "Shift ↓ (0)" - fts_btn.write("") - fts_btn.write("") - if fts_btn.button(delta_label, key=f"{prefix}_fts_shift", help="Apply delta to all following sequences", disabled=(delta == 0)): - if delta != 0: - shifted = 0 - for j in range(i + 1, len(batch_list)): - batch_list[j]["frame_to_skip"] = int(batch_list[j].get("frame_to_skip", 81)) + delta - shifted += 1 - data[KEY_BATCH_DATA] = batch_list - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.toast(f"Shifted {shifted} sequences by {delta:+d}", icon="âŦ") - st.rerun() - else: - st.toast("No change to shift", icon="â„šī¸") - seq["transition"] = st.text_input("Transition", value=str(seq.get("transition", "1-2")), key=f"{prefix}_trans") - - vs_col, vs_label = st.columns([3, 1]) - sched_val = int(seq.get("vace schedule", 1)) - seq["vace schedule"] = vs_col.number_input("VACE Schedule", value=sched_val, min_value=0, max_value=len(VACE_MODES) - 1, key=f"{prefix}_vsc") - mode_idx = int(seq["vace schedule"]) - vs_label.write("") - vs_label.write("") - vs_label.caption(VACE_MODES[mode_idx]) - - with st.popover("📋 Mode Reference"): - st.markdown( - "| # | Mode | Formula |\n" - "|:--|:-----|:--------|\n" - + "\n".join( - f"| **{j}** | {VACE_MODES[j]} | `{VACE_FORMULAS[j]}` |" - for j in range(len(VACE_MODES)) - ) - + "\n\n*All totals snapped to 4n+1 (1,5,9,â€Ļ,49,â€Ļ,81,â€Ļ)*" - ) - - seq["input_a_frames"] = st.number_input("Input A Frames", value=int(seq.get("input_a_frames", 16)), key=f"{prefix}_ia") - seq["input_b_frames"] = st.number_input("Input B Frames", value=int(seq.get("input_b_frames", 16)), key=f"{prefix}_ib") - input_a = int(seq.get("input_a_frames", 16)) - input_b = int(seq.get("input_b_frames", 16)) - stored_total = int(seq.get("vace_length", 49)) - # Reverse using same mode formula that was used to store - if mode_idx == 0: - base_length = max(stored_total - input_a, 1) - elif mode_idx == 1: - base_length = max(stored_total - input_b, 1) - else: - base_length = max(stored_total - input_a - input_b, 1) - vl_col, vl_out = st.columns([3, 1]) - new_base = vl_col.number_input("VACE Length", value=base_length, min_value=1, key=f"{prefix}_vl") - if mode_idx == 0: # End Extend: base + A - raw_total = new_base + input_a - elif mode_idx == 1: # Pre Extend: base + B - raw_total = new_base + input_b - else: # Most modes: base + A + B - raw_total = new_base + input_a + input_b - # Snap to 4n+1 (1,5,9,13,...,81,...) to match VACE sampler - seq["vace_length"] = ((raw_total + 2) // 4) * 4 + 1 - vl_out.metric("Output", seq["vace_length"]) - seq["reference switch"] = st.number_input("Reference Switch", value=int(seq.get("reference switch", 1)), key=f"{prefix}_rsw") - - # --- UPDATED: LoRA Settings with Tag Wrapping --- - with st.expander("💊 LoRA Settings"): - lc1, lc2, lc3 = st.columns(3) - - # Helper to render the tag wrapper UI - def render_lora_col(col_obj, lora_idx): - with col_obj: - st.caption(f"**LoRA {lora_idx}**") - - # --- HIGH --- - k_high = f"lora {lora_idx} high" - raw_h = str(seq.get(k_high, "")) - # Strip tags for display - disp_h = raw_h.replace("", "") - - st.write("High:") - rh1, rh2, rh3 = st.columns([0.25, 1, 0.1]) - rh1.markdown("
<lora:
", unsafe_allow_html=True) - val_h = rh2.text_input(f"L{lora_idx}H", value=disp_h, key=f"{prefix}_l{lora_idx}h", label_visibility="collapsed") - rh3.markdown("
>
", unsafe_allow_html=True) - - if val_h: - seq[k_high] = f"" - else: - seq[k_high] = "" - - # --- LOW --- - k_low = f"lora {lora_idx} low" - raw_l = str(seq.get(k_low, "")) - # Strip tags for display - disp_l = raw_l.replace("", "") - - st.write("Low:") - rl1, rl2, rl3 = st.columns([0.25, 1, 0.1]) - rl1.markdown("
<lora:
", unsafe_allow_html=True) - val_l = rl2.text_input(f"L{lora_idx}L", value=disp_l, key=f"{prefix}_l{lora_idx}l", label_visibility="collapsed") - rl3.markdown("
>
", unsafe_allow_html=True) - - if val_l: - seq[k_low] = f"" - else: - seq[k_low] = "" - - render_lora_col(lc1, 1) - render_lora_col(lc2, 2) - render_lora_col(lc3, 3) - - # --- CUSTOM PARAMETERS --- - st.markdown("---") - st.caption("🔧 Custom Parameters") - - custom_keys = [k for k in seq.keys() if k not in standard_keys] - keys_to_remove = [] - - if custom_keys: - for k in custom_keys: - ck1, ck2, ck3 = st.columns([1, 2, 0.5]) - ck1.text_input("Key", value=k, disabled=True, key=f"{prefix}_ck_lbl_{k}", label_visibility="collapsed") - val = ck2.text_input("Value", value=str(seq[k]), key=f"{prefix}_cv_{k}", label_visibility="collapsed") - seq[k] = val - - if ck3.button("đŸ—‘ī¸", key=f"{prefix}_cdel_{k}"): - keys_to_remove.append(k) - - with st.expander("➕ Add Parameter"): - nk_col, nv_col = st.columns(2) - new_k = nk_col.text_input("Key", key=f"{prefix}_new_k") - new_v = nv_col.text_input("Value", key=f"{prefix}_new_v") - - if st.button("Add", key=f"{prefix}_add_cust"): - if new_k and new_k not in seq: - seq[new_k] = new_v - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.rerun() - - if keys_to_remove: - for k in keys_to_remove: - del seq[k] - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - st.rerun() - - st.markdown("---") - - # --- SAVE ACTIONS WITH HISTORY COMMIT --- - col_save, col_note = st.columns([1, 2]) - - with col_note: - commit_msg = st.text_input("Change Note (Optional)", placeholder="e.g. Added sequence 3") - - with col_save: - if st.button("💾 Save & Snap", use_container_width=True): - data[KEY_BATCH_DATA] = batch_list - - tree_data = data.get(KEY_HISTORY_TREE, {}) - htree = HistoryTree(tree_data) - - snapshot_payload = copy.deepcopy(data) - if KEY_HISTORY_TREE in snapshot_payload: del snapshot_payload[KEY_HISTORY_TREE] - - htree.commit(snapshot_payload, note=commit_msg if commit_msg else "Batch Update") - - data[KEY_HISTORY_TREE] = htree.to_dict() - save_json(file_path, data) - - if 'restored_indicator' in st.session_state: - del st.session_state.restored_indicator - - st.toast("Batch Saved & Snapshot Created!", icon="🚀") - st.rerun() \ No newline at end of file diff --git a/tab_comfy.py b/tab_comfy.py deleted file mode 100644 index 04630cc..0000000 --- a/tab_comfy.py +++ /dev/null @@ -1,249 +0,0 @@ -import streamlit as st -import requests -from PIL import Image -from io import BytesIO -import urllib.parse -import html -import time # <--- NEW IMPORT -from utils import save_config - -def render_single_instance(instance_config, index, all_instances, timeout_minutes): - url = instance_config.get("url", "http://127.0.0.1:8188") - name = instance_config.get("name", f"Server {index+1}") - - COMFY_URL = url.rstrip("/") - - # --- TIMEOUT LOGIC --- - # Generate unique keys for session state - toggle_key = f"live_toggle_{index}" - start_time_key = f"live_start_{index}" - - # Check if we need to auto-close - if st.session_state.get(toggle_key, False) and timeout_minutes > 0: - start_time = st.session_state.get(start_time_key, 0) - elapsed = time.time() - start_time - if elapsed > (timeout_minutes * 60): - st.session_state[toggle_key] = False - # We don't need st.rerun() here because the fragment loop will pick up the state change on the next pass - # but an explicit rerun makes it snappy. - st.rerun() - - c_head, c_set = st.columns([3, 1]) - c_head.markdown(f"### 🔌 {name}") - - with c_set.popover("âš™ī¸ Settings"): - st.caption("Press Update to apply changes!") - new_name = st.text_input("Name", value=name, key=f"name_{index}") - new_url = st.text_input("URL", value=url, key=f"url_{index}") - - if new_url != url: - st.warning("âš ī¸ Unsaved URL! Click Update below.") - - if st.button("💾 Update & Save", key=f"save_{index}", type="primary"): - all_instances[index]["name"] = new_name - all_instances[index]["url"] = new_url - st.session_state.config["comfy_instances"] = all_instances - - save_config( - st.session_state.current_dir, - st.session_state.config['favorites'], - st.session_state.config - ) - st.toast("Server config saved!", icon="💾") - st.rerun() - - st.divider() - if st.button("đŸ—‘ī¸ Remove Server", key=f"del_{index}"): - all_instances.pop(index) - st.session_state.config["comfy_instances"] = all_instances - save_config( - st.session_state.current_dir, - st.session_state.config['favorites'], - st.session_state.config - ) - st.rerun() - - # --- 1. STATUS DASHBOARD --- - with st.expander("📊 Server Status", expanded=True): - col1, col2, col3, col4 = st.columns([1, 1, 1, 1]) - try: - res = requests.get(f"{COMFY_URL}/queue", timeout=1.5) - queue_data = res.json() - running_cnt = len(queue_data.get("queue_running", [])) - pending_cnt = len(queue_data.get("queue_pending", [])) - - col1.metric("Status", "đŸŸĸ Online" if running_cnt > 0 else "💤 Idle") - col2.metric("Pending", pending_cnt) - col3.metric("Running", running_cnt) - - if col4.button("🔄 Check Img", key=f"refresh_{index}", use_container_width=True): - st.session_state[f"force_img_refresh_{index}"] = True - except Exception: - col1.metric("Status", "🔴 Offline") - col2.metric("Pending", "-") - col3.metric("Running", "-") - st.error(f"Could not connect to API at {COMFY_URL}") - - # --- 2. LIVE VIEW (VIA REMOTE BROWSER) --- - st.write("") - c_label, c_ctrl = st.columns([1, 2]) - c_label.subheader("đŸ“ē Live View") - - # Capture the toggle interaction to set start time - def on_toggle_change(): - if st.session_state[toggle_key]: - st.session_state[start_time_key] = time.time() - - enable_preview = c_ctrl.checkbox( - "Enable Live Preview", - value=False, - key=toggle_key, - on_change=on_toggle_change - ) - - if enable_preview: - # Display Countdown if timeout is active - if timeout_minutes > 0: - elapsed = time.time() - st.session_state.get(start_time_key, time.time()) - remaining = (timeout_minutes * 60) - elapsed - st.caption(f"âąī¸ Auto-off in: **{int(remaining)}s**") - - # Height Slider - iframe_h = st.slider( - "Height (px)", - min_value=600, max_value=2500, value=1000, step=50, - key=f"h_slider_{index}" - ) - - # Get Configured Viewer URL - viewer_base = st.session_state.config.get("viewer_url", "") - final_src = viewer_base.strip() - - # Validate URL scheme before embedding - parsed = urllib.parse.urlparse(final_src) - if final_src and parsed.scheme in ("http", "https"): - safe_src = html.escape(final_src, quote=True) - st.info(f"Viewing via Remote Browser: `{final_src}`") - st.markdown( - f""" - - """, - unsafe_allow_html=True - ) - else: - st.warning("No valid viewer URL configured. Set one in Monitor Settings below.") - else: - st.info("Live Preview is disabled.") - - st.markdown("---") - - # --- 3. LATEST OUTPUT --- - if st.session_state.get(f"force_img_refresh_{index}", False): - st.caption("đŸ–ŧī¸ Most Recent Output") - try: - hist_res = requests.get(f"{COMFY_URL}/history", timeout=2) - history = hist_res.json() - if history: - last_prompt_id = list(history.keys())[-1] - outputs = history[last_prompt_id].get("outputs", {}) - found_img = None - for node_id, node_output in outputs.items(): - if "images" in node_output: - for img_info in node_output["images"]: - if img_info["type"] == "output": - found_img = img_info - break - if found_img: break - - if found_img: - img_name = found_img['filename'] - folder = found_img['subfolder'] - img_type = found_img['type'] - img_url = f"{COMFY_URL}/view?filename={img_name}&subfolder={folder}&type={img_type}" - img_res = requests.get(img_url) - image = Image.open(BytesIO(img_res.content)) - st.image(image, caption=f"Last Output: {img_name}") - else: - st.warning("Last run had no image output.") - else: - st.info("No history found.") - st.session_state[f"force_img_refresh_{index}"] = False - except Exception as e: - st.error(f"Error fetching image: {e}") - -# Check for fragment support (Streamlit 1.37+) -if hasattr(st, "fragment"): - # This decorator ensures this function re-runs every 10 seconds automatically - # allowing it to catch the timeout even if you are away from the keyboard. - @st.fragment(run_every=300) - def _monitor_fragment(): - _render_content() -else: - # Fallback for older Streamlit versions (Won't auto-refresh while idle) - def _monitor_fragment(): - _render_content() - -def _render_content(): - # --- GLOBAL SETTINGS FOR MONITOR --- - with st.expander("🔧 Monitor Settings", expanded=False): - c_set1, c_set2 = st.columns(2) - - current_viewer = st.session_state.config.get("viewer_url", "") - new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://localhost:5800") - - # New Timeout Slider - current_timeout = st.session_state.config.get("monitor_timeout", 0) - new_timeout = c_set2.slider("Live Preview Timeout (Minutes)", 0, 60, value=current_timeout, help="0 = Always On. Sets how long the preview stays open before auto-closing.") - - if st.button("💾 Save Monitor Settings"): - st.session_state.config["viewer_url"] = new_viewer - st.session_state.config["monitor_timeout"] = new_timeout - save_config( - st.session_state.current_dir, - st.session_state.config['favorites'], - st.session_state.config - ) - st.success("Settings saved!") - st.rerun() - - # --- INSTANCE MANAGEMENT --- - if "comfy_instances" not in st.session_state.config: - st.session_state.config["comfy_instances"] = [ - {"name": "Main Server", "url": "http://192.168.1.100:8188"} - ] - - instances = st.session_state.config["comfy_instances"] - tab_names = [i["name"] for i in instances] + ["➕ Add Server"] - tabs = st.tabs(tab_names) - - timeout_val = st.session_state.config.get("monitor_timeout", 0) - - for i, tab in enumerate(tabs[:-1]): - with tab: - render_single_instance(instances[i], i, instances, timeout_val) - - with tabs[-1]: - st.header("Add New ComfyUI Instance") - with st.form("add_server_form"): - new_name = st.text_input("Server Name", placeholder="e.g. Render Node 2") - new_url = st.text_input("URL", placeholder="http://192.168.1.50:8188") - if st.form_submit_button("Add Instance"): - if new_name and new_url: - instances.append({"name": new_name, "url": new_url}) - st.session_state.config["comfy_instances"] = instances - - save_config( - st.session_state.current_dir, - st.session_state.config['favorites'], - st.session_state.config - ) - st.success("Server Added!") - st.rerun() - else: - st.error("Please fill in both Name and URL.") - -def render_comfy_monitor(): - # We call the wrapper which decides if it's a fragment or not - _monitor_fragment() \ No newline at end of file diff --git a/tab_raw.py b/tab_raw.py deleted file mode 100644 index 5458aaf..0000000 --- a/tab_raw.py +++ /dev/null @@ -1,78 +0,0 @@ -import streamlit as st -import json -import copy -from utils import save_json, get_file_mtime, KEY_HISTORY_TREE, KEY_PROMPT_HISTORY - -def render_raw_editor(data, file_path): - st.subheader(f"đŸ’ģ Raw Editor: {file_path.name}") - - # Toggle to hide massive history objects - # This is crucial because history trees can get huge and make the text area laggy. - col_ctrl, col_info = st.columns([1, 2]) - with col_ctrl: - hide_history = st.checkbox( - "Hide History (Safe Mode)", - value=True, - help="Hides 'history_tree' and 'prompt_history' to keep the editor fast and prevent accidental deletion of version control." - ) - - # Prepare display data - if hide_history: - display_data = copy.deepcopy(data) - # Safely remove heavy keys for the view only - if KEY_HISTORY_TREE in display_data: del display_data[KEY_HISTORY_TREE] - if KEY_PROMPT_HISTORY in display_data: del display_data[KEY_PROMPT_HISTORY] - else: - display_data = data - - # Convert to string - # ensure_ascii=False ensures emojis and special chars render correctly - try: - json_str = json.dumps(display_data, indent=4, ensure_ascii=False) - except Exception as e: - st.error(f"Error serializing JSON: {e}") - json_str = "{}" - - # The Text Editor - # We use ui_reset_token in the key to force the text area to reload content on save - new_json_str = st.text_area( - "JSON Content", - value=json_str, - height=650, - key=f"raw_edit_{file_path.name}_{st.session_state.ui_reset_token}" - ) - - st.markdown("---") - - if st.button("💾 Save Raw Changes", type="primary", use_container_width=True): - try: - # 1. Parse the text back to JSON - input_data = json.loads(new_json_str) - - # 2. If we were in Safe Mode, we must merge the hidden history back in - if hide_history: - if KEY_HISTORY_TREE in data: - input_data[KEY_HISTORY_TREE] = data[KEY_HISTORY_TREE] - if KEY_PROMPT_HISTORY in data: - input_data[KEY_PROMPT_HISTORY] = data[KEY_PROMPT_HISTORY] - - # 3. Save to Disk - save_json(file_path, input_data) - - # 4. Update Session State - # We clear and update the existing dictionary object so other tabs see the changes - data.clear() - data.update(input_data) - - # 5. Update Metadata to prevent conflict warnings - st.session_state.last_mtime = get_file_mtime(file_path) - st.session_state.ui_reset_token += 1 - - st.toast("Raw JSON Saved Successfully!", icon="✅") - st.rerun() - - except json.JSONDecodeError as e: - st.error(f"❌ Invalid JSON Syntax: {e}") - st.error("Please fix the formatting errors above before saving.") - except Exception as e: - st.error(f"❌ Unexpected Error: {e}") \ No newline at end of file diff --git a/tab_timeline.py b/tab_timeline.py deleted file mode 100644 index 94e8377..0000000 --- a/tab_timeline.py +++ /dev/null @@ -1,390 +0,0 @@ -import streamlit as st -import copy -import time -from history_tree import HistoryTree -from utils import save_json, KEY_BATCH_DATA, KEY_HISTORY_TREE - -try: - from streamlit_agraph import agraph, Node, Edge, Config - AGRAPH_AVAILABLE = True -except ImportError: - AGRAPH_AVAILABLE = False - - -def render_timeline_tab(data, file_path): - tree_data = data.get(KEY_HISTORY_TREE, {}) - if not tree_data: - st.info("No history timeline exists. Make some changes in the Editor first!") - return - - htree = HistoryTree(tree_data) - - # --- Initialize selection state --- - if "timeline_selected_nodes" not in st.session_state: - st.session_state.timeline_selected_nodes = set() - - if 'restored_indicator' in st.session_state and st.session_state.restored_indicator: - st.info(f"📍 Editing Restored Version: **{st.session_state.restored_indicator}**") - - # --- VIEW SWITCHER + SELECTION MODE --- - c_title, c_view, c_toggle = st.columns([2, 1, 0.6]) - c_title.subheader("đŸ•°ī¸ Version History") - - view_mode = c_view.radio( - "View Mode", - ["đŸŒŗ Horizontal", "🌲 Vertical", "📜 Linear Log"], - horizontal=True, - label_visibility="collapsed" - ) - - selection_mode = c_toggle.toggle("Select to Delete", key="timeline_selection_mode") - if not selection_mode: - st.session_state.timeline_selected_nodes = set() - - # --- Build sorted node list (shared by all views) --- - all_nodes = list(htree.nodes.values()) - all_nodes.sort(key=lambda x: x["timestamp"], reverse=True) - - # --- MULTISELECT PICKER (shown when selection mode is on) --- - if selection_mode: - def _fmt_node_option(nid): - n = htree.nodes[nid] - ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp'])) - note = n.get('note', 'Step') - head = " (HEAD)" if nid == htree.head_id else "" - return f"{note} â€ĸ {ts} ({nid[:6]}){head}" - - all_ids = [n["id"] for n in all_nodes] - current_selection = [nid for nid in all_ids if nid in st.session_state.timeline_selected_nodes] - picked = st.multiselect( - "Select nodes to delete:", - options=all_ids, - default=current_selection, - format_func=_fmt_node_option, - ) - st.session_state.timeline_selected_nodes = set(picked) - - c_all, c_none, _ = st.columns([1, 1, 4]) - if c_all.button("Select All", use_container_width=True): - st.session_state.timeline_selected_nodes = set(all_ids) - st.rerun() - if c_none.button("Deselect All", use_container_width=True): - st.session_state.timeline_selected_nodes = set() - st.rerun() - - # --- RENDER GRAPH VIEWS --- - if view_mode in ["đŸŒŗ Horizontal", "🌲 Vertical"]: - direction = "LR" if view_mode == "đŸŒŗ Horizontal" else "TB" - - if AGRAPH_AVAILABLE: - # Interactive graph with streamlit-agraph - selected_set = st.session_state.timeline_selected_nodes if selection_mode else set() - clicked_node = _render_interactive_graph(htree, direction, selected_set) - if clicked_node and clicked_node in htree.nodes: - if selection_mode: - # Toggle node in selection set - if clicked_node in st.session_state.timeline_selected_nodes: - st.session_state.timeline_selected_nodes.discard(clicked_node) - else: - st.session_state.timeline_selected_nodes.add(clicked_node) - st.rerun() - else: - node = htree.nodes[clicked_node] - if clicked_node != htree.head_id: - _restore_node(data, node, htree, file_path) - else: - # Fallback to static graphviz - try: - graph_dot = htree.generate_graph(direction=direction) - if direction == "LR": - st.graphviz_chart(graph_dot, use_container_width=True) - else: - _, col_center, _ = st.columns([1, 2, 1]) - with col_center: - st.graphviz_chart(graph_dot, use_container_width=True) - except Exception as e: - st.error(f"Graph Error: {e}") - st.caption("💡 Install `streamlit-agraph` for interactive click-to-restore") - - # --- RENDER LINEAR LOG VIEW --- - elif view_mode == "📜 Linear Log": - st.caption("A simple chronological list of all snapshots.") - - for n in all_nodes: - is_head = (n["id"] == htree.head_id) - with st.container(): - if selection_mode: - c0, c1, c2, c3 = st.columns([0.3, 0.5, 4, 1]) - with c0: - is_selected = n["id"] in st.session_state.timeline_selected_nodes - if st.checkbox("", value=is_selected, key=f"log_sel_{n['id']}", label_visibility="collapsed"): - st.session_state.timeline_selected_nodes.add(n["id"]) - else: - st.session_state.timeline_selected_nodes.discard(n["id"]) - else: - c1, c2, c3 = st.columns([0.5, 4, 1]) - with c1: - st.markdown("### 📍" if is_head else "### âšĢ") - with c2: - note_txt = n.get('note', 'Step') - ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp'])) - if is_head: - st.markdown(f"**{note_txt}** (Current)") - else: - st.write(f"**{note_txt}**") - st.caption(f"ID: {n['id'][:6]} â€ĸ {ts}") - with c3: - if not is_head and not selection_mode: - if st.button("âĒ", key=f"log_rst_{n['id']}", help="Restore this version"): - _restore_node(data, n, htree, file_path) - st.divider() - - # --- BATCH DELETE UI --- - if selection_mode and st.session_state.timeline_selected_nodes: - # Prune any selected IDs that no longer exist in the tree - valid_selected = st.session_state.timeline_selected_nodes & set(htree.nodes.keys()) - st.session_state.timeline_selected_nodes = valid_selected - count = len(valid_selected) - if count > 0: - st.warning(f"**{count}** node{'s' if count != 1 else ''} selected for deletion.") - if st.button(f"đŸ—‘ī¸ Delete {count} Node{'s' if count != 1 else ''}", type="primary"): - # Backup - if "history_tree_backup" not in data: - data["history_tree_backup"] = [] - data["history_tree_backup"].append(copy.deepcopy(htree.to_dict())) - # Delete all selected nodes - for nid in valid_selected: - if nid in htree.nodes: - del htree.nodes[nid] - # Clean up branch tips - for b, tip in list(htree.branches.items()): - if tip in valid_selected: - del htree.branches[b] - # Reassign HEAD if deleted - if htree.head_id in valid_selected: - if htree.nodes: - fallback = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])[-1] - htree.head_id = fallback["id"] - else: - htree.head_id = None - # Save and reset - data[KEY_HISTORY_TREE] = htree.to_dict() - save_json(file_path, data) - st.session_state.timeline_selected_nodes = set() - st.toast(f"Deleted {count} node{'s' if count != 1 else ''}!", icon="đŸ—‘ī¸") - st.rerun() - - st.markdown("---") - - # --- NODE SELECTOR --- - col_sel, col_act = st.columns([3, 1]) - - def fmt_node(n): - ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp'])) - return f"{n.get('note', 'Step')} â€ĸ {ts} ({n['id'][:6]})" - - with col_sel: - current_idx = 0 - for i, n in enumerate(all_nodes): - if n["id"] == htree.head_id: - current_idx = i - break - - selected_node = st.selectbox( - "Select Version to Manage:", - all_nodes, - format_func=fmt_node, - index=current_idx - ) - - if not selected_node: - return - - node_data = selected_node["data"] - - # --- RESTORE --- - with col_act: - st.write(""); st.write("") - if st.button("âĒ Restore Version", type="primary", use_container_width=True): - _restore_node(data, selected_node, htree, file_path) - - # --- RENAME --- - rn_col1, rn_col2 = st.columns([3, 1]) - new_label = rn_col1.text_input("Rename Label", value=selected_node.get("note", "")) - if rn_col2.button("Update Label"): - selected_node["note"] = new_label - data[KEY_HISTORY_TREE] = htree.to_dict() - save_json(file_path, data) - st.rerun() - - # --- DANGER ZONE --- - st.markdown("---") - with st.expander("âš ī¸ Danger Zone (Delete)"): - st.warning("Deleting a node cannot be undone.") - if st.button("đŸ—‘ī¸ Delete This Node", type="primary"): - if selected_node['id'] in htree.nodes: - if "history_tree_backup" not in data: - data["history_tree_backup"] = [] - data["history_tree_backup"].append(copy.deepcopy(htree.to_dict())) - del htree.nodes[selected_node['id']] - for b, tip in list(htree.branches.items()): - if tip == selected_node['id']: - del htree.branches[b] - if htree.head_id == selected_node['id']: - if htree.nodes: - fallback = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])[-1] - htree.head_id = fallback["id"] - else: - htree.head_id = None - data[KEY_HISTORY_TREE] = htree.to_dict() - save_json(file_path, data) - st.toast("Node Deleted", icon="đŸ—‘ī¸") - st.rerun() - - # --- DATA PREVIEW --- - st.markdown("---") - with st.expander("🔍 Data Preview", expanded=False): - batch_list = node_data.get(KEY_BATCH_DATA, []) - - if batch_list and isinstance(batch_list, list) and len(batch_list) > 0: - st.info(f"📚 This snapshot contains {len(batch_list)} sequences.") - for i, seq_data in enumerate(batch_list): - seq_num = seq_data.get("sequence_number", i + 1) - with st.expander(f"đŸŽŦ Sequence #{seq_num}", expanded=(i == 0)): - prefix = f"p_{selected_node['id']}_s{i}" - _render_preview_fields(seq_data, prefix) - else: - prefix = f"p_{selected_node['id']}_single" - _render_preview_fields(node_data, prefix) - - -def _render_interactive_graph(htree, direction, selected_nodes=None): - """Render an interactive graph using streamlit-agraph. Returns clicked node id.""" - if selected_nodes is None: - selected_nodes = set() - - # Build reverse lookup: branch tip -> branch name(s) - tip_to_branches = {} - for b_name, tip_id in htree.branches.items(): - if tip_id: - tip_to_branches.setdefault(tip_id, []).append(b_name) - - sorted_nodes_list = sorted(htree.nodes.values(), key=lambda x: x["timestamp"]) - - nodes = [] - edges = [] - - for n in sorted_nodes_list: - nid = n["id"] - full_note = n.get('note', 'Step') - display_note = (full_note[:20] + '..') if len(full_note) > 20 else full_note - ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp'])) - - # Branch label - branch_label = "" - if nid in tip_to_branches: - branch_label = f"\n[{', '.join(tip_to_branches[nid])}]" - - label = f"{display_note}\n{ts}{branch_label}" - - # Colors - selected nodes override to red - if nid in selected_nodes: - color = "#ff5555" # Selected for deletion - red - elif nid == htree.head_id: - color = "#ffdd44" # Current head - bright yellow - elif nid in htree.branches.values(): - color = "#66dd66" # Branch tip - bright green - else: - color = "#aaccff" # Normal - light blue - - nodes.append(Node( - id=nid, - label=label, - size=20, - color=color, - font={"size": 10, "color": "#ffffff"} - )) - - if n["parent"] and n["parent"] in htree.nodes: - edges.append(Edge(source=n["parent"], target=nid, color="#888888")) - - # Config based on direction - is_horizontal = direction == "LR" - config = Config( - width="100%", - height=400 if is_horizontal else 600, - directed=True, - hierarchical=True, - physics=False, - nodeHighlightBehavior=True, - highlightColor="#ffcc00", - collapsible=False, - layout={ - "hierarchical": { - "enabled": True, - "direction": "LR" if is_horizontal else "UD", - "sortMethod": "directed", - "levelSeparation": 150 if is_horizontal else 80, - "nodeSpacing": 100 if is_horizontal else 60, - } - } - ) - - return agraph(nodes=nodes, edges=edges, config=config) - - -def _restore_node(data, node, htree, file_path): - """Restore a history node as the current version.""" - node_data = node["data"] - if KEY_BATCH_DATA not in node_data and KEY_BATCH_DATA in data: - del data[KEY_BATCH_DATA] - data.update(node_data) - htree.head_id = node['id'] - data[KEY_HISTORY_TREE] = htree.to_dict() - save_json(file_path, data) - st.session_state.ui_reset_token += 1 - label = f"{node.get('note')} ({node['id'][:4]})" - st.session_state.restored_indicator = label - st.toast("Restored!", icon="🔄") - st.rerun() - - -def _render_preview_fields(item_data, prefix): - """Render a read-only preview of prompts, settings, and LoRAs.""" - # Prompts - p_col1, p_col2 = st.columns(2) - with p_col1: - st.text_area("General Positive", value=item_data.get("general_prompt", ""), height=80, disabled=True, key=f"{prefix}_gp") - val_sp = item_data.get("current_prompt", "") or item_data.get("prompt", "") - st.text_area("Specific Positive", value=val_sp, height=80, disabled=True, key=f"{prefix}_sp") - with p_col2: - st.text_area("General Negative", value=item_data.get("general_negative", ""), height=80, disabled=True, key=f"{prefix}_gn") - st.text_area("Specific Negative", value=item_data.get("negative", ""), height=80, disabled=True, key=f"{prefix}_sn") - - # Settings - s_col1, s_col2, s_col3 = st.columns(3) - s_col1.text_input("Camera", value=str(item_data.get("camera", "static")), disabled=True, key=f"{prefix}_cam") - s_col2.text_input("FLF", value=str(item_data.get("flf", "0.0")), disabled=True, key=f"{prefix}_flf") - s_col3.text_input("Seed", value=str(item_data.get("seed", "-1")), disabled=True, key=f"{prefix}_seed") - - # LoRAs - with st.expander("💊 LoRA Configuration", expanded=False): - l1, l2, l3 = st.columns(3) - with l1: - st.text_input("L1 Name", value=item_data.get("lora 1 high", ""), disabled=True, key=f"{prefix}_l1h") - st.text_input("L1 Str", value=str(item_data.get("lora 1 low", "")), disabled=True, key=f"{prefix}_l1l") - with l2: - st.text_input("L2 Name", value=item_data.get("lora 2 high", ""), disabled=True, key=f"{prefix}_l2h") - st.text_input("L2 Str", value=str(item_data.get("lora 2 low", "")), disabled=True, key=f"{prefix}_l2l") - with l3: - st.text_input("L3 Name", value=item_data.get("lora 3 high", ""), disabled=True, key=f"{prefix}_l3h") - st.text_input("L3 Str", value=str(item_data.get("lora 3 low", "")), disabled=True, key=f"{prefix}_l3l") - - # VACE - vace_keys = ["frame_to_skip", "vace schedule", "video file path"] - if any(k in item_data for k in vace_keys): - with st.expander("đŸŽžī¸ VACE / I2V Settings", expanded=False): - v1, v2, v3 = st.columns(3) - v1.text_input("Skip Frames", value=str(item_data.get("frame_to_skip", "")), disabled=True, key=f"{prefix}_fts") - v2.text_input("Schedule", value=str(item_data.get("vace schedule", "")), disabled=True, key=f"{prefix}_vsc") - v3.text_input("Video Path", value=str(item_data.get("video file path", "")), disabled=True, key=f"{prefix}_vid")