From 326ae25ab2ad1795b15a3ef17b26bd84400003ae Mon Sep 17 00:00:00 2001 From: Ethanfel Date: Mon, 2 Feb 2026 11:47:50 +0100 Subject: [PATCH] Fix critical bugs, security issues, and code quality across all modules - Replace bare except clauses with specific exceptions (JSONDecodeError, IOError, ValueError, TypeError) - Add path traversal protection restricting navigation to ALLOWED_BASE_DIR - Sanitize iframe URLs with scheme validation and html.escape to prevent XSS - Extract duplicate to_float/to_int to module-level helpers in json_loader.py - Replace silent modulo wrapping with clamped bounds checking via get_batch_item() - Remove hardcoded IP 192.168.1.51:5800, default to empty string - Add try/except around fragile batch history string parsing - Add JSON schema validation (dict type check) in read_json_data() - Add Python logging framework, replace print() calls - Consolidate session state initialization into loop with defaults dict - Guard streamlit_agraph import with try/except ImportError - Add backup snapshot before history node deletion - Add cycle detection in HistoryTree.commit() Co-Authored-By: Claude Opus 4.5 --- app.py | 54 +++++++++++++------------- history_tree.py | 13 ++++++- json_loader.py | 94 ++++++++++++++++++++------------------------- tab_batch.py | 20 ++++++---- tab_comfy.py | 33 +++++++++------- tab_timeline.py | 5 +++ tab_timeline_wip.py | 10 ++++- utils.py | 20 ++++++++-- 8 files changed, 143 insertions(+), 106 deletions(-) diff --git a/app.py b/app.py index ea0ae0c..a190fd9 100644 --- a/app.py +++ b/app.py @@ -4,8 +4,8 @@ from pathlib import Path # --- Import Custom Modules --- from utils import ( - load_config, save_config, load_snippets, save_snippets, - load_json, save_json, generate_templates, DEFAULTS + load_config, save_config, load_snippets, save_snippets, + load_json, save_json, generate_templates, DEFAULTS, ALLOWED_BASE_DIR ) from tab_single import render_single_editor from tab_batch import render_batch_processor @@ -22,31 +22,23 @@ st.set_page_config(layout="wide", page_title="AI Settings Manager") # ========================================== # 2. SESSION STATE INITIALIZATION # ========================================== +_SESSION_DEFAULTS = { + "snippets": load_snippets, + "loaded_file": lambda: None, + "last_mtime": lambda: 0, + "edit_history_idx": lambda: None, + "single_editor_cache": lambda: DEFAULTS.copy(), + "ui_reset_token": lambda: 0, + "active_tab_name": lambda: "📝 Single Editor", +} + if 'config' not in st.session_state: st.session_state.config = load_config() st.session_state.current_dir = Path(st.session_state.config.get("last_dir", Path.cwd())) -if 'snippets' not in st.session_state: - st.session_state.snippets = load_snippets() - -if 'loaded_file' not in st.session_state: - st.session_state.loaded_file = None - -if 'last_mtime' not in st.session_state: - st.session_state.last_mtime = 0 - -if 'edit_history_idx' not in st.session_state: - st.session_state.edit_history_idx = None - -if 'single_editor_cache' not in st.session_state: - st.session_state.single_editor_cache = DEFAULTS.copy() - -if 'ui_reset_token' not in st.session_state: - st.session_state.ui_reset_token = 0 - -# Track the active tab state for programmatic switching -if 'active_tab_name' not in st.session_state: - st.session_state.active_tab_name = "📝 Single Editor" +for key, factory in _SESSION_DEFAULTS.items(): + if key not in st.session_state: + st.session_state[key] = factory() # ========================================== # 3. SIDEBAR (NAVIGATOR & TOOLS) @@ -57,12 +49,18 @@ with st.sidebar: # --- Path Navigator --- new_path = st.text_input("Current Path", value=str(st.session_state.current_dir)) if new_path != str(st.session_state.current_dir): - p = Path(new_path) + p = Path(new_path).resolve() if p.exists() and p.is_dir(): - st.session_state.current_dir = p - st.session_state.config['last_dir'] = str(p) - save_config(st.session_state.current_dir, st.session_state.config['favorites']) - st.rerun() + # Restrict navigation to the allowed base directory + try: + p.relative_to(ALLOWED_BASE_DIR) + except ValueError: + st.error(f"Access denied: path must be under {ALLOWED_BASE_DIR}") + else: + st.session_state.current_dir = p + st.session_state.config['last_dir'] = str(p) + save_config(st.session_state.current_dir, st.session_state.config['favorites']) + st.rerun() # --- Favorites System --- if st.button("📌 Pin Current Folder"): diff --git a/history_tree.py b/history_tree.py index c49611b..2afb356 100644 --- a/history_tree.py +++ b/history_tree.py @@ -24,7 +24,18 @@ class HistoryTree: def commit(self, data, note="Snapshot"): new_id = str(uuid.uuid4())[:8] - + + # Cycle detection: walk parent chain from head to verify no cycle + if self.head_id: + visited = set() + current = self.head_id + while current: + if current in visited: + raise ValueError(f"Cycle detected in history tree at node {current}") + visited.add(current) + node = self.nodes.get(current) + current = node["parent"] if node else None + active_branch = None for b_name, tip_id in self.branches.items(): if tip_id == self.head_id: diff --git a/json_loader.py b/json_loader.py index d3eafb3..2a43a6f 100644 --- a/json_loader.py +++ b/json_loader.py @@ -1,17 +1,45 @@ import json import os +import logging + +logger = logging.getLogger(__name__) + +def to_float(val): + try: + return float(val) + except (ValueError, TypeError): + return 0.0 + +def to_int(val): + try: + return int(float(val)) + except (ValueError, TypeError): + return 0 + +def get_batch_item(data, sequence_number): + """Resolve batch item by sequence_number, clamping to valid range.""" + if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: + idx = max(0, min(sequence_number - 1, len(data["batch_data"]) - 1)) + if sequence_number - 1 != idx: + logger.warning(f"Sequence {sequence_number} out of range (1-{len(data['batch_data'])}), clamped to {idx + 1}") + return data["batch_data"][idx] + return data # --- Shared Helper --- def read_json_data(json_path): if not os.path.exists(json_path): - print(f"[JSON Loader] Warning: File not found at {json_path}") + logger.warning(f"File not found at {json_path}") return {} try: with open(json_path, 'r') as f: - return json.load(f) - except Exception as e: - print(f"[JSON Loader] Error: {e}") + data = json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Error reading {json_path}: {e}") return {} + if not isinstance(data, dict): + logger.warning(f"Expected dict from {json_path}, got {type(data).__name__}") + return {} + return data # ========================================== # 1. STANDARD NODES (Single File) @@ -47,12 +75,7 @@ class JSONLoaderStandard: def load_standard(self, json_path): data = read_json_data(json_path) - def to_float(val): - try: return float(val) - except: return 0.0 - def to_int(val): - try: return int(float(val)) - except: return 0 + return ( str(data.get("general_prompt", "")), str(data.get("general_negative", "")), @@ -74,12 +97,7 @@ class JSONLoaderVACE: def load_vace(self, json_path): data = read_json_data(json_path) - def to_float(val): - try: return float(val) - except: return 0.0 - def to_int(val): - try: return int(float(val)) - except: return 0 + return ( str(data.get("general_prompt", "")), str(data.get("general_negative", "")), @@ -107,10 +125,7 @@ class JSONLoaderBatchLoRA: def load_batch_loras(self, json_path, sequence_number): data = read_json_data(json_path) - target_data = data - if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: - idx = (sequence_number - 1) % len(data["batch_data"]) - target_data = data["batch_data"][idx] + target_data = get_batch_item(data, sequence_number) return ( str(target_data.get("lora 1 high", "")), str(target_data.get("lora 1 low", "")), str(target_data.get("lora 2 high", "")), str(target_data.get("lora 2 low", "")), @@ -128,16 +143,8 @@ class JSONLoaderBatchI2V: def load_batch_i2v(self, json_path, sequence_number): data = read_json_data(json_path) - target_data = data - if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: - idx = (sequence_number - 1) % len(data["batch_data"]) - target_data = data["batch_data"][idx] - def to_float(val): - try: return float(val) - except: return 0.0 - def to_int(val): - try: return int(float(val)) - except: return 0 + target_data = get_batch_item(data, sequence_number) + return ( str(target_data.get("general_prompt", "")), str(target_data.get("general_negative", "")), str(target_data.get("current_prompt", "")), str(target_data.get("negative", "")), @@ -157,16 +164,8 @@ class JSONLoaderBatchVACE: def load_batch_vace(self, json_path, sequence_number): data = read_json_data(json_path) - target_data = data - if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: - idx = (sequence_number - 1) % len(data["batch_data"]) - target_data = data["batch_data"][idx] - def to_float(val): - try: return float(val) - except: return 0.0 - def to_int(val): - try: return int(float(val)) - except: return 0 + target_data = get_batch_item(data, sequence_number) + return ( str(target_data.get("general_prompt", "")), str(target_data.get("general_negative", "")), str(target_data.get("current_prompt", "")), str(target_data.get("negative", "")), @@ -199,10 +198,7 @@ class JSONLoaderCustom1: def load_custom(self, json_path, sequence_number, key_1=""): data = read_json_data(json_path) - target_data = data - if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: - idx = (sequence_number - 1) % len(data["batch_data"]) - target_data = data["batch_data"][idx] + target_data = get_batch_item(data, sequence_number) return (str(target_data.get(key_1, "")),) class JSONLoaderCustom3: @@ -226,10 +222,7 @@ class JSONLoaderCustom3: def load_custom(self, json_path, sequence_number, key_1="", key_2="", key_3=""): data = read_json_data(json_path) - target_data = data - if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: - idx = (sequence_number - 1) % len(data["batch_data"]) - target_data = data["batch_data"][idx] + target_data = get_batch_item(data, sequence_number) return ( str(target_data.get(key_1, "")), str(target_data.get(key_2, "")), @@ -260,10 +253,7 @@ class JSONLoaderCustom6: def load_custom(self, json_path, sequence_number, key_1="", key_2="", key_3="", key_4="", key_5="", key_6=""): data = read_json_data(json_path) - target_data = data - if "batch_data" in data and isinstance(data["batch_data"], list) and len(data["batch_data"]) > 0: - idx = (sequence_number - 1) % len(data["batch_data"]) - target_data = data["batch_data"][idx] + target_data = get_batch_item(data, sequence_number) return ( str(target_data.get(key_1, "")), str(target_data.get(key_2, "")), str(target_data.get(key_3, "")), str(target_data.get(key_4, "")), diff --git a/tab_batch.py b/tab_batch.py index 50850f9..11be243 100644 --- a/tab_batch.py +++ b/tab_batch.py @@ -85,13 +85,19 @@ def render_batch_processor(data, file_path, json_files, current_dir, selected_fi if bc3.button("➕ From History", use_container_width=True, disabled=not src_hist): if sel_hist: - idx = int(sel_hist.split(":")[0].replace("#", "")) - 1 - item = DEFAULTS.copy() - h_item = src_hist[idx] - item.update(h_item) - if "loras" in h_item and isinstance(h_item["loras"], dict): - item.update(h_item["loras"]) - add_sequence(item) + try: + idx = int(sel_hist.split(":")[0].replace("#", "")) - 1 + if idx < 0 or idx >= len(src_hist): + st.error(f"History index {idx + 1} out of range.") + else: + item = DEFAULTS.copy() + h_item = src_hist[idx] + item.update(h_item) + if "loras" in h_item and isinstance(h_item["loras"], dict): + item.update(h_item["loras"]) + add_sequence(item) + except (ValueError, IndexError) as e: + st.error(f"Could not parse history selection: {e}") # --- RENDER LIST --- st.markdown("---") diff --git a/tab_comfy.py b/tab_comfy.py index ca1551a..04630cc 100644 --- a/tab_comfy.py +++ b/tab_comfy.py @@ -3,6 +3,7 @@ import requests from PIL import Image from io import BytesIO import urllib.parse +import html import time # <--- NEW IMPORT from utils import save_config @@ -115,18 +116,24 @@ def render_single_instance(instance_config, index, all_instances, timeout_minute ) # Get Configured Viewer URL - viewer_base = st.session_state.config.get("viewer_url", "http://192.168.1.51:5800") - final_src = viewer_base + viewer_base = st.session_state.config.get("viewer_url", "") + final_src = viewer_base.strip() - st.info(f"Viewing via Remote Browser: `{final_src}`") - st.markdown( - f""" - - """, - unsafe_allow_html=True - ) + # Validate URL scheme before embedding + parsed = urllib.parse.urlparse(final_src) + if final_src and parsed.scheme in ("http", "https"): + safe_src = html.escape(final_src, quote=True) + st.info(f"Viewing via Remote Browser: `{final_src}`") + st.markdown( + f""" + + """, + unsafe_allow_html=True + ) + else: + st.warning("No valid viewer URL configured. Set one in Monitor Settings below.") else: st.info("Live Preview is disabled.") @@ -183,8 +190,8 @@ def _render_content(): with st.expander("🔧 Monitor Settings", expanded=False): c_set1, c_set2 = st.columns(2) - current_viewer = st.session_state.config.get("viewer_url", "http://192.168.1.51:5800") - new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://192.168.1.51:5800") + current_viewer = st.session_state.config.get("viewer_url", "") + new_viewer = c_set1.text_input("Remote Browser URL", value=current_viewer, help="e.g., http://localhost:5800") # New Timeout Slider current_timeout = st.session_state.config.get("monitor_timeout", 0) diff --git a/tab_timeline.py b/tab_timeline.py index 7ce14a7..4e1ce50 100644 --- a/tab_timeline.py +++ b/tab_timeline.py @@ -1,4 +1,5 @@ import streamlit as st +import copy import json import graphviz import time @@ -137,6 +138,10 @@ def render_timeline_tab(data, file_path): st.warning("Deleting a node cannot be undone.") if st.button("🗑️ Delete This Node", type="primary"): if selected_node['id'] in htree.nodes: + # Backup current tree state before destructive operation + if "history_tree_backup" not in data: + data["history_tree_backup"] = [] + data["history_tree_backup"].append(copy.deepcopy(htree.to_dict())) del htree.nodes[selected_node['id']] for b, tip in list(htree.branches.items()): if tip == selected_node['id']: diff --git a/tab_timeline_wip.py b/tab_timeline_wip.py index 02a77d6..82b06c7 100644 --- a/tab_timeline_wip.py +++ b/tab_timeline_wip.py @@ -2,9 +2,17 @@ import streamlit as st import json from history_tree import HistoryTree from utils import save_json -from streamlit_agraph import agraph, Node, Edge, Config + +try: + from streamlit_agraph import agraph, Node, Edge, Config + _HAS_AGRAPH = True +except ImportError: + _HAS_AGRAPH = False def render_timeline_wip(data, file_path): + if not _HAS_AGRAPH: + st.error("The `streamlit-agraph` package is required for this tab. Install it with: `pip install streamlit-agraph`") + return tree_data = data.get("history_tree", {}) if not tree_data: st.info("No history timeline exists.") diff --git a/utils.py b/utils.py index a94e0b5..f9879c8 100644 --- a/utils.py +++ b/utils.py @@ -1,8 +1,17 @@ import json +import logging import time from pathlib import Path import streamlit as st +# Configure logging for the application +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger(__name__) + # Default structure for new files DEFAULTS = { # --- Standard Keys for your Restored Single Tab --- @@ -43,14 +52,17 @@ DEFAULTS = { CONFIG_FILE = Path(".editor_config.json") SNIPPETS_FILE = Path(".editor_snippets.json") +# Restrict directory navigation to this base path (resolve symlinks) +ALLOWED_BASE_DIR = Path.cwd().resolve() + def load_config(): """Loads the main editor configuration (Favorites, Last Dir, Servers).""" if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, 'r') as f: return json.load(f) - except: - pass + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Failed to load config: {e}") return {"favorites": [], "last_dir": str(Path.cwd()), "comfy_instances": []} def save_config(current_dir, favorites, extra_data=None): @@ -76,8 +88,8 @@ def load_snippets(): try: with open(SNIPPETS_FILE, 'r') as f: return json.load(f) - except: - pass + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Failed to load snippets: {e}") return {} def save_snippets(snippets):