391 lines
16 KiB
Python
391 lines
16 KiB
Python
import streamlit as st
|
|
import copy
|
|
import time
|
|
from history_tree import HistoryTree
|
|
from utils import save_json, KEY_BATCH_DATA, KEY_HISTORY_TREE
|
|
|
|
try:
|
|
from streamlit_agraph import agraph, Node, Edge, Config
|
|
AGRAPH_AVAILABLE = True
|
|
except ImportError:
|
|
AGRAPH_AVAILABLE = False
|
|
|
|
|
|
def render_timeline_tab(data, file_path):
|
|
tree_data = data.get(KEY_HISTORY_TREE, {})
|
|
if not tree_data:
|
|
st.info("No history timeline exists. Make some changes in the Editor first!")
|
|
return
|
|
|
|
htree = HistoryTree(tree_data)
|
|
|
|
# --- Initialize selection state ---
|
|
if "timeline_selected_nodes" not in st.session_state:
|
|
st.session_state.timeline_selected_nodes = set()
|
|
|
|
if 'restored_indicator' in st.session_state and st.session_state.restored_indicator:
|
|
st.info(f"📍 Editing Restored Version: **{st.session_state.restored_indicator}**")
|
|
|
|
# --- VIEW SWITCHER + SELECTION MODE ---
|
|
c_title, c_view, c_toggle = st.columns([2, 1, 0.6])
|
|
c_title.subheader("🕰️ Version History")
|
|
|
|
view_mode = c_view.radio(
|
|
"View Mode",
|
|
["🌳 Horizontal", "🌲 Vertical", "📜 Linear Log"],
|
|
horizontal=True,
|
|
label_visibility="collapsed"
|
|
)
|
|
|
|
selection_mode = c_toggle.toggle("Select to Delete", key="timeline_selection_mode")
|
|
if not selection_mode:
|
|
st.session_state.timeline_selected_nodes = set()
|
|
|
|
# --- Build sorted node list (shared by all views) ---
|
|
all_nodes = list(htree.nodes.values())
|
|
all_nodes.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
|
|
# --- MULTISELECT PICKER (shown when selection mode is on) ---
|
|
if selection_mode:
|
|
def _fmt_node_option(nid):
|
|
n = htree.nodes[nid]
|
|
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
|
|
note = n.get('note', 'Step')
|
|
head = " (HEAD)" if nid == htree.head_id else ""
|
|
return f"{note} • {ts} ({nid[:6]}){head}"
|
|
|
|
all_ids = [n["id"] for n in all_nodes]
|
|
current_selection = [nid for nid in all_ids if nid in st.session_state.timeline_selected_nodes]
|
|
picked = st.multiselect(
|
|
"Select nodes to delete:",
|
|
options=all_ids,
|
|
default=current_selection,
|
|
format_func=_fmt_node_option,
|
|
)
|
|
st.session_state.timeline_selected_nodes = set(picked)
|
|
|
|
c_all, c_none, _ = st.columns([1, 1, 4])
|
|
if c_all.button("Select All", use_container_width=True):
|
|
st.session_state.timeline_selected_nodes = set(all_ids)
|
|
st.rerun()
|
|
if c_none.button("Deselect All", use_container_width=True):
|
|
st.session_state.timeline_selected_nodes = set()
|
|
st.rerun()
|
|
|
|
# --- RENDER GRAPH VIEWS ---
|
|
if view_mode in ["🌳 Horizontal", "🌲 Vertical"]:
|
|
direction = "LR" if view_mode == "🌳 Horizontal" else "TB"
|
|
|
|
if AGRAPH_AVAILABLE:
|
|
# Interactive graph with streamlit-agraph
|
|
selected_set = st.session_state.timeline_selected_nodes if selection_mode else set()
|
|
clicked_node = _render_interactive_graph(htree, direction, selected_set)
|
|
if clicked_node and clicked_node in htree.nodes:
|
|
if selection_mode:
|
|
# Toggle node in selection set
|
|
if clicked_node in st.session_state.timeline_selected_nodes:
|
|
st.session_state.timeline_selected_nodes.discard(clicked_node)
|
|
else:
|
|
st.session_state.timeline_selected_nodes.add(clicked_node)
|
|
st.rerun()
|
|
else:
|
|
node = htree.nodes[clicked_node]
|
|
if clicked_node != htree.head_id:
|
|
_restore_node(data, node, htree, file_path)
|
|
else:
|
|
# Fallback to static graphviz
|
|
try:
|
|
graph_dot = htree.generate_graph(direction=direction)
|
|
if direction == "LR":
|
|
st.graphviz_chart(graph_dot, use_container_width=True)
|
|
else:
|
|
_, col_center, _ = st.columns([1, 2, 1])
|
|
with col_center:
|
|
st.graphviz_chart(graph_dot, use_container_width=True)
|
|
except Exception as e:
|
|
st.error(f"Graph Error: {e}")
|
|
st.caption("💡 Install `streamlit-agraph` for interactive click-to-restore")
|
|
|
|
# --- RENDER LINEAR LOG VIEW ---
|
|
elif view_mode == "📜 Linear Log":
|
|
st.caption("A simple chronological list of all snapshots.")
|
|
|
|
for n in all_nodes:
|
|
is_head = (n["id"] == htree.head_id)
|
|
with st.container():
|
|
if selection_mode:
|
|
c0, c1, c2, c3 = st.columns([0.3, 0.5, 4, 1])
|
|
with c0:
|
|
is_selected = n["id"] in st.session_state.timeline_selected_nodes
|
|
if st.checkbox("", value=is_selected, key=f"log_sel_{n['id']}", label_visibility="collapsed"):
|
|
st.session_state.timeline_selected_nodes.add(n["id"])
|
|
else:
|
|
st.session_state.timeline_selected_nodes.discard(n["id"])
|
|
else:
|
|
c1, c2, c3 = st.columns([0.5, 4, 1])
|
|
with c1:
|
|
st.markdown("### 📍" if is_head else "### ⚫")
|
|
with c2:
|
|
note_txt = n.get('note', 'Step')
|
|
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
|
|
if is_head:
|
|
st.markdown(f"**{note_txt}** (Current)")
|
|
else:
|
|
st.write(f"**{note_txt}**")
|
|
st.caption(f"ID: {n['id'][:6]} • {ts}")
|
|
with c3:
|
|
if not is_head and not selection_mode:
|
|
if st.button("⏪", key=f"log_rst_{n['id']}", help="Restore this version"):
|
|
_restore_node(data, n, htree, file_path)
|
|
st.divider()
|
|
|
|
# --- BATCH DELETE UI ---
|
|
if selection_mode and st.session_state.timeline_selected_nodes:
|
|
# Prune any selected IDs that no longer exist in the tree
|
|
valid_selected = st.session_state.timeline_selected_nodes & set(htree.nodes.keys())
|
|
st.session_state.timeline_selected_nodes = valid_selected
|
|
count = len(valid_selected)
|
|
if count > 0:
|
|
st.warning(f"**{count}** node{'s' if count != 1 else ''} selected for deletion.")
|
|
if st.button(f"🗑️ Delete {count} Node{'s' if count != 1 else ''}", type="primary"):
|
|
# Backup
|
|
if "history_tree_backup" not in data:
|
|
data["history_tree_backup"] = []
|
|
data["history_tree_backup"].append(copy.deepcopy(htree.to_dict()))
|
|
# Delete all selected nodes
|
|
for nid in valid_selected:
|
|
if nid in htree.nodes:
|
|
del htree.nodes[nid]
|
|
# Clean up branch tips
|
|
for b, tip in list(htree.branches.items()):
|
|
if tip in valid_selected:
|
|
del htree.branches[b]
|
|
# Reassign HEAD if deleted
|
|
if htree.head_id in valid_selected:
|
|
if htree.nodes:
|
|
fallback = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])[-1]
|
|
htree.head_id = fallback["id"]
|
|
else:
|
|
htree.head_id = None
|
|
# Save and reset
|
|
data[KEY_HISTORY_TREE] = htree.to_dict()
|
|
save_json(file_path, data)
|
|
st.session_state.timeline_selected_nodes = set()
|
|
st.toast(f"Deleted {count} node{'s' if count != 1 else ''}!", icon="🗑️")
|
|
st.rerun()
|
|
|
|
st.markdown("---")
|
|
|
|
# --- NODE SELECTOR ---
|
|
col_sel, col_act = st.columns([3, 1])
|
|
|
|
def fmt_node(n):
|
|
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
|
|
return f"{n.get('note', 'Step')} • {ts} ({n['id'][:6]})"
|
|
|
|
with col_sel:
|
|
current_idx = 0
|
|
for i, n in enumerate(all_nodes):
|
|
if n["id"] == htree.head_id:
|
|
current_idx = i
|
|
break
|
|
|
|
selected_node = st.selectbox(
|
|
"Select Version to Manage:",
|
|
all_nodes,
|
|
format_func=fmt_node,
|
|
index=current_idx
|
|
)
|
|
|
|
if not selected_node:
|
|
return
|
|
|
|
node_data = selected_node["data"]
|
|
|
|
# --- RESTORE ---
|
|
with col_act:
|
|
st.write(""); st.write("")
|
|
if st.button("⏪ Restore Version", type="primary", use_container_width=True):
|
|
_restore_node(data, selected_node, htree, file_path)
|
|
|
|
# --- RENAME ---
|
|
rn_col1, rn_col2 = st.columns([3, 1])
|
|
new_label = rn_col1.text_input("Rename Label", value=selected_node.get("note", ""))
|
|
if rn_col2.button("Update Label"):
|
|
selected_node["note"] = new_label
|
|
data[KEY_HISTORY_TREE] = htree.to_dict()
|
|
save_json(file_path, data)
|
|
st.rerun()
|
|
|
|
# --- DANGER ZONE ---
|
|
st.markdown("---")
|
|
with st.expander("⚠️ Danger Zone (Delete)"):
|
|
st.warning("Deleting a node cannot be undone.")
|
|
if st.button("🗑️ Delete This Node", type="primary"):
|
|
if selected_node['id'] in htree.nodes:
|
|
if "history_tree_backup" not in data:
|
|
data["history_tree_backup"] = []
|
|
data["history_tree_backup"].append(copy.deepcopy(htree.to_dict()))
|
|
del htree.nodes[selected_node['id']]
|
|
for b, tip in list(htree.branches.items()):
|
|
if tip == selected_node['id']:
|
|
del htree.branches[b]
|
|
if htree.head_id == selected_node['id']:
|
|
if htree.nodes:
|
|
fallback = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])[-1]
|
|
htree.head_id = fallback["id"]
|
|
else:
|
|
htree.head_id = None
|
|
data[KEY_HISTORY_TREE] = htree.to_dict()
|
|
save_json(file_path, data)
|
|
st.toast("Node Deleted", icon="🗑️")
|
|
st.rerun()
|
|
|
|
# --- DATA PREVIEW ---
|
|
st.markdown("---")
|
|
with st.expander("🔍 Data Preview", expanded=False):
|
|
batch_list = node_data.get(KEY_BATCH_DATA, [])
|
|
|
|
if batch_list and isinstance(batch_list, list) and len(batch_list) > 0:
|
|
st.info(f"📚 This snapshot contains {len(batch_list)} sequences.")
|
|
for i, seq_data in enumerate(batch_list):
|
|
seq_num = seq_data.get("sequence_number", i + 1)
|
|
with st.expander(f"🎬 Sequence #{seq_num}", expanded=(i == 0)):
|
|
prefix = f"p_{selected_node['id']}_s{i}"
|
|
_render_preview_fields(seq_data, prefix)
|
|
else:
|
|
prefix = f"p_{selected_node['id']}_single"
|
|
_render_preview_fields(node_data, prefix)
|
|
|
|
|
|
def _render_interactive_graph(htree, direction, selected_nodes=None):
|
|
"""Render an interactive graph using streamlit-agraph. Returns clicked node id."""
|
|
if selected_nodes is None:
|
|
selected_nodes = set()
|
|
|
|
# Build reverse lookup: branch tip -> branch name(s)
|
|
tip_to_branches = {}
|
|
for b_name, tip_id in htree.branches.items():
|
|
if tip_id:
|
|
tip_to_branches.setdefault(tip_id, []).append(b_name)
|
|
|
|
sorted_nodes_list = sorted(htree.nodes.values(), key=lambda x: x["timestamp"])
|
|
|
|
nodes = []
|
|
edges = []
|
|
|
|
for n in sorted_nodes_list:
|
|
nid = n["id"]
|
|
full_note = n.get('note', 'Step')
|
|
display_note = (full_note[:20] + '..') if len(full_note) > 20 else full_note
|
|
ts = time.strftime('%b %d %H:%M', time.localtime(n['timestamp']))
|
|
|
|
# Branch label
|
|
branch_label = ""
|
|
if nid in tip_to_branches:
|
|
branch_label = f"\n[{', '.join(tip_to_branches[nid])}]"
|
|
|
|
label = f"{display_note}\n{ts}{branch_label}"
|
|
|
|
# Colors - selected nodes override to red
|
|
if nid in selected_nodes:
|
|
color = "#ff5555" # Selected for deletion - red
|
|
elif nid == htree.head_id:
|
|
color = "#ffdd44" # Current head - bright yellow
|
|
elif nid in htree.branches.values():
|
|
color = "#66dd66" # Branch tip - bright green
|
|
else:
|
|
color = "#aaccff" # Normal - light blue
|
|
|
|
nodes.append(Node(
|
|
id=nid,
|
|
label=label,
|
|
size=20,
|
|
color=color,
|
|
font={"size": 10, "color": "#ffffff"}
|
|
))
|
|
|
|
if n["parent"] and n["parent"] in htree.nodes:
|
|
edges.append(Edge(source=n["parent"], target=nid, color="#888888"))
|
|
|
|
# Config based on direction
|
|
is_horizontal = direction == "LR"
|
|
config = Config(
|
|
width="100%",
|
|
height=400 if is_horizontal else 600,
|
|
directed=True,
|
|
hierarchical=True,
|
|
physics=False,
|
|
nodeHighlightBehavior=True,
|
|
highlightColor="#ffcc00",
|
|
collapsible=False,
|
|
layout={
|
|
"hierarchical": {
|
|
"enabled": True,
|
|
"direction": "LR" if is_horizontal else "UD",
|
|
"sortMethod": "directed",
|
|
"levelSeparation": 150 if is_horizontal else 80,
|
|
"nodeSpacing": 100 if is_horizontal else 60,
|
|
}
|
|
}
|
|
)
|
|
|
|
return agraph(nodes=nodes, edges=edges, config=config)
|
|
|
|
|
|
def _restore_node(data, node, htree, file_path):
|
|
"""Restore a history node as the current version."""
|
|
node_data = node["data"]
|
|
if KEY_BATCH_DATA not in node_data and KEY_BATCH_DATA in data:
|
|
del data[KEY_BATCH_DATA]
|
|
data.update(node_data)
|
|
htree.head_id = node['id']
|
|
data[KEY_HISTORY_TREE] = htree.to_dict()
|
|
save_json(file_path, data)
|
|
st.session_state.ui_reset_token += 1
|
|
label = f"{node.get('note')} ({node['id'][:4]})"
|
|
st.session_state.restored_indicator = label
|
|
st.toast("Restored!", icon="🔄")
|
|
st.rerun()
|
|
|
|
|
|
def _render_preview_fields(item_data, prefix):
|
|
"""Render a read-only preview of prompts, settings, and LoRAs."""
|
|
# Prompts
|
|
p_col1, p_col2 = st.columns(2)
|
|
with p_col1:
|
|
st.text_area("General Positive", value=item_data.get("general_prompt", ""), height=80, disabled=True, key=f"{prefix}_gp")
|
|
val_sp = item_data.get("current_prompt", "") or item_data.get("prompt", "")
|
|
st.text_area("Specific Positive", value=val_sp, height=80, disabled=True, key=f"{prefix}_sp")
|
|
with p_col2:
|
|
st.text_area("General Negative", value=item_data.get("general_negative", ""), height=80, disabled=True, key=f"{prefix}_gn")
|
|
st.text_area("Specific Negative", value=item_data.get("negative", ""), height=80, disabled=True, key=f"{prefix}_sn")
|
|
|
|
# Settings
|
|
s_col1, s_col2, s_col3 = st.columns(3)
|
|
s_col1.text_input("Camera", value=str(item_data.get("camera", "static")), disabled=True, key=f"{prefix}_cam")
|
|
s_col2.text_input("FLF", value=str(item_data.get("flf", "0.0")), disabled=True, key=f"{prefix}_flf")
|
|
s_col3.text_input("Seed", value=str(item_data.get("seed", "-1")), disabled=True, key=f"{prefix}_seed")
|
|
|
|
# LoRAs
|
|
with st.expander("💊 LoRA Configuration", expanded=False):
|
|
l1, l2, l3 = st.columns(3)
|
|
with l1:
|
|
st.text_input("L1 Name", value=item_data.get("lora 1 high", ""), disabled=True, key=f"{prefix}_l1h")
|
|
st.text_input("L1 Str", value=str(item_data.get("lora 1 low", "")), disabled=True, key=f"{prefix}_l1l")
|
|
with l2:
|
|
st.text_input("L2 Name", value=item_data.get("lora 2 high", ""), disabled=True, key=f"{prefix}_l2h")
|
|
st.text_input("L2 Str", value=str(item_data.get("lora 2 low", "")), disabled=True, key=f"{prefix}_l2l")
|
|
with l3:
|
|
st.text_input("L3 Name", value=item_data.get("lora 3 high", ""), disabled=True, key=f"{prefix}_l3h")
|
|
st.text_input("L3 Str", value=str(item_data.get("lora 3 low", "")), disabled=True, key=f"{prefix}_l3l")
|
|
|
|
# VACE
|
|
vace_keys = ["frame_to_skip", "vace schedule", "video file path"]
|
|
if any(k in item_data for k in vace_keys):
|
|
with st.expander("🎞️ VACE / I2V Settings", expanded=False):
|
|
v1, v2, v3 = st.columns(3)
|
|
v1.text_input("Skip Frames", value=str(item_data.get("frame_to_skip", "")), disabled=True, key=f"{prefix}_fts")
|
|
v2.text_input("Schedule", value=str(item_data.get("vace schedule", "")), disabled=True, key=f"{prefix}_vsc")
|
|
v3.text_input("Video Path", value=str(item_data.get("video file path", "")), disabled=True, key=f"{prefix}_vid")
|