Fix critical bugs, security issues, and code quality across all modules

- Replace bare except clauses with specific exceptions (JSONDecodeError, IOError, ValueError, TypeError)
- Add path traversal protection restricting navigation to ALLOWED_BASE_DIR
- Sanitize iframe URLs with scheme validation and html.escape to prevent XSS
- Extract duplicate to_float/to_int to module-level helpers in json_loader.py
- Replace silent modulo wrapping with clamped bounds checking via get_batch_item()
- Remove hardcoded IP 192.168.1.51:5800, default to empty string
- Add try/except around fragile batch history string parsing
- Add JSON schema validation (dict type check) in read_json_data()
- Add Python logging framework, replace print() calls
- Consolidate session state initialization into loop with defaults dict
- Guard streamlit_agraph import with try/except ImportError
- Add backup snapshot before history node deletion
- Add cycle detection in HistoryTree.commit()

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-02 11:47:50 +01:00
parent 268de89f6d
commit 326ae25ab2
8 changed files with 143 additions and 106 deletions

54
app.py
View File

@@ -4,8 +4,8 @@ from pathlib import Path
# --- Import Custom Modules ---
from utils import (
load_config, save_config, load_snippets, save_snippets,
load_json, save_json, generate_templates, DEFAULTS
load_config, save_config, load_snippets, save_snippets,
load_json, save_json, generate_templates, DEFAULTS, ALLOWED_BASE_DIR
)
from tab_single import render_single_editor
from tab_batch import render_batch_processor
@@ -22,31 +22,23 @@ st.set_page_config(layout="wide", page_title="AI Settings Manager")
# ==========================================
# 2. SESSION STATE INITIALIZATION
# ==========================================
_SESSION_DEFAULTS = {
"snippets": load_snippets,
"loaded_file": lambda: None,
"last_mtime": lambda: 0,
"edit_history_idx": lambda: None,
"single_editor_cache": lambda: DEFAULTS.copy(),
"ui_reset_token": lambda: 0,
"active_tab_name": lambda: "📝 Single Editor",
}
if 'config' not in st.session_state:
st.session_state.config = load_config()
st.session_state.current_dir = Path(st.session_state.config.get("last_dir", Path.cwd()))
if 'snippets' not in st.session_state:
st.session_state.snippets = load_snippets()
if 'loaded_file' not in st.session_state:
st.session_state.loaded_file = None
if 'last_mtime' not in st.session_state:
st.session_state.last_mtime = 0
if 'edit_history_idx' not in st.session_state:
st.session_state.edit_history_idx = None
if 'single_editor_cache' not in st.session_state:
st.session_state.single_editor_cache = DEFAULTS.copy()
if 'ui_reset_token' not in st.session_state:
st.session_state.ui_reset_token = 0
# Track the active tab state for programmatic switching
if 'active_tab_name' not in st.session_state:
st.session_state.active_tab_name = "📝 Single Editor"
for key, factory in _SESSION_DEFAULTS.items():
if key not in st.session_state:
st.session_state[key] = factory()
# ==========================================
# 3. SIDEBAR (NAVIGATOR & TOOLS)
@@ -57,12 +49,18 @@ with st.sidebar:
# --- Path Navigator ---
new_path = st.text_input("Current Path", value=str(st.session_state.current_dir))
if new_path != str(st.session_state.current_dir):
p = Path(new_path)
p = Path(new_path).resolve()
if p.exists() and p.is_dir():
st.session_state.current_dir = p
st.session_state.config['last_dir'] = str(p)
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
st.rerun()
# Restrict navigation to the allowed base directory
try:
p.relative_to(ALLOWED_BASE_DIR)
except ValueError:
st.error(f"Access denied: path must be under {ALLOWED_BASE_DIR}")
else:
st.session_state.current_dir = p
st.session_state.config['last_dir'] = str(p)
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
st.rerun()
# --- Favorites System ---
if st.button("📌 Pin Current Folder"):

View File

@@ -24,7 +24,18 @@ class HistoryTree:
def commit(self, data, note="Snapshot"):
new_id = str(uuid.uuid4())[:8]
# Cycle detection: walk parent chain from head to verify no cycle
if self.head_id:
visited = set()
current = self.head_id
while current:
if current in visited:
raise ValueError(f"Cycle detected in history tree at node {current}")
visited.add(current)
node = self.nodes.get(current)
current = node["parent"] if node else None
active_branch = None
for b_name, tip_id in self.branches.items():
if tip_id == self.head_id:

View File

@@ -1,17 +1,45 @@
import json
import os
import logging
logger = logging.getLogger(__name__)
def to_float(val):
try:
return float(val)
except (ValueError, TypeError):
return 0.0
def to_int(val):
try:
return int(float(val))
except (ValueError, TypeError):
return 0
def get_batch_item(data, sequence_number):
"""Resolve batch item by sequence_number, clamping to valid range."""
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = max(0, min(sequence_number - 1, len(data["batch_data"]) - 1))
if sequence_number - 1 != idx:
logger.warning(f"Sequence {sequence_number} out of range (1-{len(data['batch_data'])}), clamped to {idx + 1}")
return data["batch_data"][idx]
return data
# --- Shared Helper ---
def read_json_data(json_path):
if not os.path.exists(json_path):
print(f"[JSON Loader] Warning: File not found at {json_path}")
logger.warning(f"File not found at {json_path}")
return {}
try:
with open(json_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"[JSON Loader] Error: {e}")
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Error reading {json_path}: {e}")
return {}
if not isinstance(data, dict):
logger.warning(f"Expected dict from {json_path}, got {type(data).__name__}")
return {}
return data
# ==========================================
# 1. STANDARD NODES (Single File)
@@ -47,12 +75,7 @@ class JSONLoaderStandard:
def load_standard(self, json_path):
data = read_json_data(json_path)
def to_float(val):
try: return float(val)
except: return 0.0
def to_int(val):
try: return int(float(val))
except: return 0
return (
str(data.get("general_prompt", "")), str(data.get("general_negative", "")),
@@ -74,12 +97,7 @@ class JSONLoaderVACE:
def load_vace(self, json_path):
data = read_json_data(json_path)
def to_float(val):
try: return float(val)
except: return 0.0
def to_int(val):
try: return int(float(val))
except: return 0
return (
str(data.get("general_prompt", "")), str(data.get("general_negative", "")),
@@ -107,10 +125,7 @@ class JSONLoaderBatchLoRA:
def load_batch_loras(self, json_path, sequence_number):
data = read_json_data(json_path)
target_data = data
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = (sequence_number - 1) % len(data["batch_data"])
target_data = data["batch_data"][idx]
target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get("lora 1 high", "")), str(target_data.get("lora 1 low", "")),
str(target_data.get("lora 2 high", "")), str(target_data.get("lora 2 low", "")),
@@ -128,16 +143,8 @@ class JSONLoaderBatchI2V:
def load_batch_i2v(self, json_path, sequence_number):
data = read_json_data(json_path)
target_data = data
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = (sequence_number - 1) % len(data["batch_data"])
target_data = data["batch_data"][idx]
def to_float(val):
try: return float(val)
except: return 0.0
def to_int(val):
try: return int(float(val))
except: return 0
target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get("general_prompt", "")), str(target_data.get("general_negative", "")),
str(target_data.get("current_prompt", "")), str(target_data.get("negative", "")),
@@ -157,16 +164,8 @@ class JSONLoaderBatchVACE:
def load_batch_vace(self, json_path, sequence_number):
data = read_json_data(json_path)
target_data = data
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = (sequence_number - 1) % len(data["batch_data"])
target_data = data["batch_data"][idx]
def to_float(val):
try: return float(val)
except: return 0.0
def to_int(val):
try: return int(float(val))
except: return 0
target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get("general_prompt", "")), str(target_data.get("general_negative", "")),
str(target_data.get("current_prompt", "")), str(target_data.get("negative", "")),
@@ -199,10 +198,7 @@ class JSONLoaderCustom1:
def load_custom(self, json_path, sequence_number, key_1=""):
data = read_json_data(json_path)
target_data = data
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = (sequence_number - 1) % len(data["batch_data"])
target_data = data["batch_data"][idx]
target_data = get_batch_item(data, sequence_number)
return (str(target_data.get(key_1, "")),)
class JSONLoaderCustom3:
@@ -226,10 +222,7 @@ class JSONLoaderCustom3:
def load_custom(self, json_path, sequence_number, key_1="", key_2="", key_3=""):
data = read_json_data(json_path)
target_data = data
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = (sequence_number - 1) % len(data["batch_data"])
target_data = data["batch_data"][idx]
target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get(key_1, "")),
str(target_data.get(key_2, "")),
@@ -260,10 +253,7 @@ class JSONLoaderCustom6:
def load_custom(self, json_path, sequence_number, key_1="", key_2="", key_3="", key_4="", key_5="", key_6=""):
data = read_json_data(json_path)
target_data = data
if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0:
idx = (sequence_number - 1) % len(data["batch_data"])
target_data = data["batch_data"][idx]
target_data = get_batch_item(data, sequence_number)
return (
str(target_data.get(key_1, "")), str(target_data.get(key_2, "")),
str(target_data.get(key_3, "")), str(target_data.get(key_4, "")),

View File

@@ -85,13 +85,19 @@ def render_batch_processor(data, file_path, json_files, current_dir, selected_fi
if bc3.button(" From History", use_container_width=True, disabled=not src_hist):
if sel_hist:
idx = int(sel_hist.split(":")[0].replace("#", "")) - 1
item = DEFAULTS.copy()
h_item = src_hist[idx]
item.update(h_item)
if "loras" in h_item and isinstance(h_item["loras"], dict):
item.update(h_item["loras"])
add_sequence(item)
try:
idx = int(sel_hist.split(":")[0].replace("#", "")) - 1
if idx < 0 or idx >= len(src_hist):
st.error(f"History index {idx + 1} out of range.")
else:
item = DEFAULTS.copy()
h_item = src_hist[idx]
item.update(h_item)
if "loras" in h_item and isinstance(h_item["loras"], dict):
item.update(h_item["loras"])
add_sequence(item)
except (ValueError, IndexError) as e:
st.error(f"Could not parse history selection: {e}")
# --- RENDER LIST ---
st.markdown("---")

View File

@@ -3,6 +3,7 @@ import requests
from PIL import Image
from io import BytesIO
import urllib.parse
import html
import time # <--- NEW IMPORT
from utils import save_config
@@ -115,18 +116,24 @@ def render_single_instance(instance_config, index, all_instances, timeout_minute
)
# Get Configured Viewer URL
viewer_base = st.session_state.config.get("viewer_url", "http://192.168.1.51:5800")
final_src = viewer_base
viewer_base = st.session_state.config.get("viewer_url", "")
final_src = viewer_base.strip()
st.info(f"Viewing via Remote Browser: `{final_src}`")
st.markdown(
f"""
<iframe src="{final_src}" width="100%" height="{iframe_h}px"
style="border: 2px solid #666; border-radius: 8px; box-shadow: 0 4px 6px rgba(0,0,0,0.3);">
</iframe>
""",
unsafe_allow_html=True
)
# Validate URL scheme before embedding
parsed = urllib.parse.urlparse(final_src)
if final_src and parsed.scheme in ("http", "https"):
safe_src = html.escape(final_src, quote=True)
st.info(f"Viewing via Remote Browser: `{final_src}`")
st.markdown(
f"""
<iframe src="{safe_src}" width="100%" height="{iframe_h}px"
style="border: 2px solid #666; border-radius: 8px; box-shadow: 0 4px 6px rgba(0,0,0,0.3);">
</iframe>
""",
unsafe_allow_html=True
)
else:
st.warning("No valid viewer URL configured. Set one in Monitor Settings below.")
else:
st.info("Live Preview is disabled.")
@@ -183,8 +190,8 @@ def _render_content():
with st.expander("🔧 Monitor Settings", expanded=False):
c_set1, c_set2 = st.columns(2)
current_viewer = st.session_state.config.get("viewer_url", "http://192.168.1.51:5800")
new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://192.168.1.51:5800")
current_viewer = st.session_state.config.get("viewer_url", "")
new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://localhost:5800")
# New Timeout Slider
current_timeout = st.session_state.config.get("monitor_timeout", 0)

View File

@@ -1,4 +1,5 @@
import streamlit as st
import copy
import json
import graphviz
import time
@@ -137,6 +138,10 @@ def render_timeline_tab(data, file_path):
st.warning("Deleting a node cannot be undone.")
if st.button("🗑️ Delete This Node", type="primary"):
if selected_node['id'] in htree.nodes:
# Backup current tree state before destructive operation
if "history_tree_backup" not in data:
data["history_tree_backup"] = []
data["history_tree_backup"].append(copy.deepcopy(htree.to_dict()))
del htree.nodes[selected_node['id']]
for b, tip in list(htree.branches.items()):
if tip == selected_node['id']:

View File

@@ -2,9 +2,17 @@ import streamlit as st
import json
from history_tree import HistoryTree
from utils import save_json
from streamlit_agraph import agraph, Node, Edge, Config
try:
from streamlit_agraph import agraph, Node, Edge, Config
_HAS_AGRAPH = True
except ImportError:
_HAS_AGRAPH = False
def render_timeline_wip(data, file_path):
if not _HAS_AGRAPH:
st.error("The `streamlit-agraph` package is required for this tab. Install it with: `pip install streamlit-agraph`")
return
tree_data = data.get("history_tree", {})
if not tree_data:
st.info("No history timeline exists.")

View File

@@ -1,8 +1,17 @@
import json
import logging
import time
from pathlib import Path
import streamlit as st
# Configure logging for the application
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# Default structure for new files
DEFAULTS = {
# --- Standard Keys for your Restored Single Tab ---
@@ -43,14 +52,17 @@ DEFAULTS = {
CONFIG_FILE = Path(".editor_config.json")
SNIPPETS_FILE = Path(".editor_snippets.json")
# Restrict directory navigation to this base path (resolve symlinks)
ALLOWED_BASE_DIR = Path.cwd().resolve()
def load_config():
"""Loads the main editor configuration (Favorites, Last Dir, Servers)."""
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
except:
pass
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load config: {e}")
return {"favorites": [], "last_dir": str(Path.cwd()), "comfy_instances": []}
def save_config(current_dir, favorites, extra_data=None):
@@ -76,8 +88,8 @@ def load_snippets():
try:
with open(SNIPPETS_FILE, 'r') as f:
return json.load(f)
except:
pass
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load snippets: {e}")
return {}
def save_snippets(snippets):