diff --git a/app.py b/app.py
index ea0ae0c..a190fd9 100644
--- a/app.py
+++ b/app.py
@@ -4,8 +4,8 @@ from pathlib import Path
# --- Import Custom Modules ---
from utils import (
- load_config, save_config, load_snippets, save_snippets,
- load_json, save_json, generate_templates, DEFAULTS
+ load_config, save_config, load_snippets, save_snippets,
+ load_json, save_json, generate_templates, DEFAULTS, ALLOWED_BASE_DIR
)
from tab_single import render_single_editor
from tab_batch import render_batch_processor
@@ -22,31 +22,23 @@ st.set_page_config(layout="wide", page_title="AI Settings Manager")
# ==========================================
# 2. SESSION STATE INITIALIZATION
# ==========================================
+_SESSION_DEFAULTS = {
+ "snippets": load_snippets,
+ "loaded_file": lambda: None,
+ "last_mtime": lambda: 0,
+ "edit_history_idx": lambda: None,
+ "single_editor_cache": lambda: DEFAULTS.copy(),
+ "ui_reset_token": lambda: 0,
+ "active_tab_name": lambda: "📝 Single Editor",
+}
+
if 'config' not in st.session_state:
st.session_state.config = load_config()
st.session_state.current_dir = Path(st.session_state.config.get("last_dir", Path.cwd()))
-if 'snippets' not in st.session_state:
- st.session_state.snippets = load_snippets()
-
-if 'loaded_file' not in st.session_state:
- st.session_state.loaded_file = None
-
-if 'last_mtime' not in st.session_state:
- st.session_state.last_mtime = 0
-
-if 'edit_history_idx' not in st.session_state:
- st.session_state.edit_history_idx = None
-
-if 'single_editor_cache' not in st.session_state:
- st.session_state.single_editor_cache = DEFAULTS.copy()
-
-if 'ui_reset_token' not in st.session_state:
- st.session_state.ui_reset_token = 0
-
-# Track the active tab state for programmatic switching
-if 'active_tab_name' not in st.session_state:
- st.session_state.active_tab_name = "📝 Single Editor"
+for key, factory in _SESSION_DEFAULTS.items():
+ if key not in st.session_state:
+ st.session_state[key] = factory()
# ==========================================
# 3. SIDEBAR (NAVIGATOR & TOOLS)
@@ -57,12 +49,18 @@ with st.sidebar:
# --- Path Navigator ---
new_path = st.text_input("Current Path", value=str(st.session_state.current_dir))
if new_path != str(st.session_state.current_dir):
- p = Path(new_path)
+ p = Path(new_path).resolve()
if p.exists() and p.is_dir():
- st.session_state.current_dir = p
- st.session_state.config['last_dir'] = str(p)
- save_config(st.session_state.current_dir, st.session_state.config['favorites'])
- st.rerun()
+ # Restrict navigation to the allowed base directory
+ try:
+ p.relative_to(ALLOWED_BASE_DIR)
+ except ValueError:
+ st.error(f"Access denied: path must be under {ALLOWED_BASE_DIR}")
+ else:
+ st.session_state.current_dir = p
+ st.session_state.config['last_dir'] = str(p)
+ save_config(st.session_state.current_dir, st.session_state.config['favorites'])
+ st.rerun()
# --- Favorites System ---
if st.button("📌 Pin Current Folder"):
diff --git a/history_tree.py b/history_tree.py
index c49611b..2afb356 100644
--- a/history_tree.py
+++ b/history_tree.py
@@ -24,7 +24,18 @@ class HistoryTree:
def commit(self, data, note="Snapshot"):
new_id = str(uuid.uuid4())[:8]
-
+
+ # Cycle detection: walk parent chain from head to verify no cycle
+ if self.head_id:
+ visited = set()
+ current = self.head_id
+ while current:
+ if current in visited:
+ raise ValueError(f"Cycle detected in history tree at node {current}")
+ visited.add(current)
+ node = self.nodes.get(current)
+ current = node["parent"] if node else None
+
active_branch = None
for b_name, tip_id in self.branches.items():
if tip_id == self.head_id:
diff --git a/json_loader.py b/json_loader.py
index d3eafb3..2a43a6f 100644
--- a/json_loader.py
+++ b/json_loader.py
@@ -1,17 +1,45 @@
import json
import os
+import logging
+
+logger = logging.getLogger(__name__)
+
+def to_float(val):
+ try:
+ return float(val)
+ except (ValueError, TypeError):
+ return 0.0
+
+def to_int(val):
+ try:
+ return int(float(val))
+ except (ValueError, TypeError):
+ return 0
+
+def get_batch_item(data, sequence_number):
+ """Resolve batch item by sequence_number, clamping to valid range."""
+ if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
+ idx = max(0, min(sequence_number - 1, len(data["batch_data"]) - 1))
+ if sequence_number - 1 != idx:
+ logger.warning(f"Sequence {sequence_number} out of range (1-{len(data['batch_data'])}), clamped to {idx + 1}")
+ return data["batch_data"][idx]
+ return data
# --- Shared Helper ---
def read_json_data(json_path):
if not os.path.exists(json_path):
- print(f"[JSON Loader] Warning: File not found at {json_path}")
+ logger.warning(f"File not found at {json_path}")
return {}
try:
with open(json_path, 'r') as f:
- return json.load(f)
- except Exception as e:
- print(f"[JSON Loader] Error: {e}")
+ data = json.load(f)
+ except (json.JSONDecodeError, IOError) as e:
+ logger.warning(f"Error reading {json_path}: {e}")
return {}
+ if not isinstance(data, dict):
+ logger.warning(f"Expected dict from {json_path}, got {type(data).__name__}")
+ return {}
+ return data
# ==========================================
# 1. STANDARD NODES (Single File)
@@ -47,12 +75,7 @@ class JSONLoaderStandard:
def load_standard(self, json_path):
data = read_json_data(json_path)
- def to_float(val):
- try: return float(val)
- except: return 0.0
- def to_int(val):
- try: return int(float(val))
- except: return 0
+
return (
str(data.get("general_prompt", "")), str(data.get("general_negative", "")),
@@ -74,12 +97,7 @@ class JSONLoaderVACE:
def load_vace(self, json_path):
data = read_json_data(json_path)
- def to_float(val):
- try: return float(val)
- except: return 0.0
- def to_int(val):
- try: return int(float(val))
- except: return 0
+
return (
str(data.get("general_prompt", "")), str(data.get("general_negative", "")),
@@ -107,10 +125,7 @@ class JSONLoaderBatchLoRA:
def load_batch_loras(self, json_path, sequence_number):
data = read_json_data(json_path)
- target_data = data
- if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
- idx = (sequence_number - 1) % len(data["batch_data"])
- target_data = data["batch_data"][idx]
+ target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get("lora 1 high", "")), str(target_data.get("lora 1 low", "")),
str(target_data.get("lora 2 high", "")), str(target_data.get("lora 2 low", "")),
@@ -128,16 +143,8 @@ class JSONLoaderBatchI2V:
def load_batch_i2v(self, json_path, sequence_number):
data = read_json_data(json_path)
- target_data = data
- if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
- idx = (sequence_number - 1) % len(data["batch_data"])
- target_data = data["batch_data"][idx]
- def to_float(val):
- try: return float(val)
- except: return 0.0
- def to_int(val):
- try: return int(float(val))
- except: return 0
+ target_data = get_batch_item(data, sequence_number)
+
return (
str(target_data.get("general_prompt", "")), str(target_data.get("general_negative", "")),
str(target_data.get("current_prompt", "")), str(target_data.get("negative", "")),
@@ -157,16 +164,8 @@ class JSONLoaderBatchVACE:
def load_batch_vace(self, json_path, sequence_number):
data = read_json_data(json_path)
- target_data = data
- if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
- idx = (sequence_number - 1) % len(data["batch_data"])
- target_data = data["batch_data"][idx]
- def to_float(val):
- try: return float(val)
- except: return 0.0
- def to_int(val):
- try: return int(float(val))
- except: return 0
+ target_data = get_batch_item(data, sequence_number)
+
return (
str(target_data.get("general_prompt", "")), str(target_data.get("general_negative", "")),
str(target_data.get("current_prompt", "")), str(target_data.get("negative", "")),
@@ -199,10 +198,7 @@ class JSONLoaderCustom1:
def load_custom(self, json_path, sequence_number, key_1=""):
data = read_json_data(json_path)
- target_data = data
- if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
- idx = (sequence_number - 1) % len(data["batch_data"])
- target_data = data["batch_data"][idx]
+ target_data = get_batch_item(data, sequence_number)
return (str(target_data.get(key_1, "")),)
class JSONLoaderCustom3:
@@ -226,10 +222,7 @@ class JSONLoaderCustom3:
def load_custom(self, json_path, sequence_number, key_1="", key_2="", key_3=""):
data = read_json_data(json_path)
- target_data = data
- if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
- idx = (sequence_number - 1) % len(data["batch_data"])
- target_data = data["batch_data"][idx]
+ target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get(key_1, "")),
str(target_data.get(key_2, "")),
@@ -260,10 +253,7 @@ class JSONLoaderCustom6:
def load_custom(self, json_path, sequence_number, key_1="", key_2="", key_3="", key_4="", key_5="", key_6=""):
data = read_json_data(json_path)
- target_data = data
- if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
- idx = (sequence_number - 1) % len(data["batch_data"])
- target_data = data["batch_data"][idx]
+ target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get(key_1, "")), str(target_data.get(key_2, "")),
str(target_data.get(key_3, "")), str(target_data.get(key_4, "")),
diff --git a/tab_batch.py b/tab_batch.py
index 50850f9..11be243 100644
--- a/tab_batch.py
+++ b/tab_batch.py
@@ -85,13 +85,19 @@ def render_batch_processor(data, file_path, json_files, current_dir, selected_fi
if bc3.button("➕ From History", use_container_width=True, disabled=not src_hist):
if sel_hist:
- idx = int(sel_hist.split(":")[0].replace("#", "")) - 1
- item = DEFAULTS.copy()
- h_item = src_hist[idx]
- item.update(h_item)
- if "loras" in h_item and isinstance(h_item["loras"], dict):
- item.update(h_item["loras"])
- add_sequence(item)
+ try:
+ idx = int(sel_hist.split(":")[0].replace("#", "")) - 1
+ if idx < 0 or idx >= len(src_hist):
+ st.error(f"History index {idx + 1} out of range.")
+ else:
+ item = DEFAULTS.copy()
+ h_item = src_hist[idx]
+ item.update(h_item)
+ if "loras" in h_item and isinstance(h_item["loras"], dict):
+ item.update(h_item["loras"])
+ add_sequence(item)
+ except (ValueError, IndexError) as e:
+ st.error(f"Could not parse history selection: {e}")
# --- RENDER LIST ---
st.markdown("---")
diff --git a/tab_comfy.py b/tab_comfy.py
index ca1551a..04630cc 100644
--- a/tab_comfy.py
+++ b/tab_comfy.py
@@ -3,6 +3,7 @@ import requests
from PIL import Image
from io import BytesIO
import urllib.parse
+import html
import time # <--- NEW IMPORT
from utils import save_config
@@ -115,18 +116,24 @@ def render_single_instance(instance_config, index, all_instances, timeout_minute
)
# Get Configured Viewer URL
- viewer_base = st.session_state.config.get("viewer_url", "http://192.168.1.51:5800")
- final_src = viewer_base
+ viewer_base = st.session_state.config.get("viewer_url", "")
+ final_src = viewer_base.strip()
- st.info(f"Viewing via Remote Browser: `{final_src}`")
- st.markdown(
- f"""
-
- """,
- unsafe_allow_html=True
- )
+ # Validate URL scheme before embedding
+ parsed = urllib.parse.urlparse(final_src)
+ if final_src and parsed.scheme in ("http", "https"):
+ safe_src = html.escape(final_src, quote=True)
+ st.info(f"Viewing via Remote Browser: `{final_src}`")
+ st.markdown(
+ f"""
+
+ """,
+ unsafe_allow_html=True
+ )
+ else:
+ st.warning("No valid viewer URL configured. Set one in Monitor Settings below.")
else:
st.info("Live Preview is disabled.")
@@ -183,8 +190,8 @@ def _render_content():
with st.expander("🔧 Monitor Settings", expanded=False):
c_set1, c_set2 = st.columns(2)
- current_viewer = st.session_state.config.get("viewer_url", "http://192.168.1.51:5800")
- new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://192.168.1.51:5800")
+ current_viewer = st.session_state.config.get("viewer_url", "")
+ new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://localhost:5800")
# New Timeout Slider
current_timeout = st.session_state.config.get("monitor_timeout", 0)
diff --git a/tab_timeline.py b/tab_timeline.py
index 7ce14a7..4e1ce50 100644
--- a/tab_timeline.py
+++ b/tab_timeline.py
@@ -1,4 +1,5 @@
import streamlit as st
+import copy
import json
import graphviz
import time
@@ -137,6 +138,10 @@ def render_timeline_tab(data, file_path):
st.warning("Deleting a node cannot be undone.")
if st.button("🗑️ Delete This Node", type="primary"):
if selected_node['id'] in htree.nodes:
+ # Backup current tree state before destructive operation
+ if "history_tree_backup" not in data:
+ data["history_tree_backup"] = []
+ data["history_tree_backup"].append(copy.deepcopy(htree.to_dict()))
del htree.nodes[selected_node['id']]
for b, tip in list(htree.branches.items()):
if tip == selected_node['id']:
diff --git a/tab_timeline_wip.py b/tab_timeline_wip.py
index 02a77d6..82b06c7 100644
--- a/tab_timeline_wip.py
+++ b/tab_timeline_wip.py
@@ -2,9 +2,17 @@ import streamlit as st
import json
from history_tree import HistoryTree
from utils import save_json
-from streamlit_agraph import agraph, Node, Edge, Config
+
+try:
+ from streamlit_agraph import agraph, Node, Edge, Config
+ _HAS_AGRAPH = True
+except ImportError:
+ _HAS_AGRAPH = False
def render_timeline_wip(data, file_path):
+ if not _HAS_AGRAPH:
+ st.error("The `streamlit-agraph` package is required for this tab. Install it with: `pip install streamlit-agraph`")
+ return
tree_data = data.get("history_tree", {})
if not tree_data:
st.info("No history timeline exists.")
diff --git a/utils.py b/utils.py
index a94e0b5..f9879c8 100644
--- a/utils.py
+++ b/utils.py
@@ -1,8 +1,17 @@
import json
+import logging
import time
from pathlib import Path
import streamlit as st
+# Configure logging for the application
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
+ datefmt="%H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+
# Default structure for new files
DEFAULTS = {
# --- Standard Keys for your Restored Single Tab ---
@@ -43,14 +52,17 @@ DEFAULTS = {
CONFIG_FILE = Path(".editor_config.json")
SNIPPETS_FILE = Path(".editor_snippets.json")
+# Restrict directory navigation to this base path (resolve symlinks)
+ALLOWED_BASE_DIR = Path.cwd().resolve()
+
def load_config():
"""Loads the main editor configuration (Favorites, Last Dir, Servers)."""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
- except:
- pass
+ except (json.JSONDecodeError, IOError) as e:
+ logger.warning(f"Failed to load config: {e}")
return {"favorites": [], "last_dir": str(Path.cwd()), "comfy_instances": []}
def save_config(current_dir, favorites, extra_data=None):
@@ -76,8 +88,8 @@ def load_snippets():
try:
with open(SNIPPETS_FILE, 'r') as f:
return json.load(f)
- except:
- pass
+ except (json.JSONDecodeError, IOError) as e:
+ logger.warning(f"Failed to load snippets: {e}")
return {}
def save_snippets(snippets):