Update app.py
This commit is contained in:
440
app.py
440
app.py
@@ -1,214 +1,264 @@
|
||||
import streamlit as st
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
# --- Import Custom Modules ---
|
||||
from utils import (
|
||||
load_config, save_config, load_snippets, save_snippets,
|
||||
load_json, save_json, generate_templates, DEFAULTS
|
||||
)
|
||||
from tab_single import render_single_editor
|
||||
from tab_batch import render_batch_processor
|
||||
from tab_timeline import render_timeline_tab
|
||||
from tab_timeline_wip import render_timeline_wip
|
||||
from tab_comfy import render_comfy_monitor
|
||||
import gradio as gr
|
||||
import sqlite3
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
# ==========================================
|
||||
# 1. PAGE CONFIGURATION
|
||||
# 1. DATABASE & PERSISTENCE LAYER
|
||||
# ==========================================
|
||||
st.set_page_config(layout="wide", page_title="AI Settings Manager")
|
||||
|
||||
# ==========================================
|
||||
# 2. SESSION STATE INITIALIZATION
|
||||
# ==========================================
|
||||
if 'config' not in st.session_state:
|
||||
st.session_state.config = load_config()
|
||||
st.session_state.current_dir = Path(st.session_state.config.get("last_dir", Path.cwd()))
|
||||
DB_FILE = "app_data.db"
|
||||
LEGACY_JSON_FILE = "settings.json" # File to import from if DB is empty
|
||||
|
||||
if 'snippets' not in st.session_state:
|
||||
st.session_state.snippets = load_snippets()
|
||||
def get_db():
|
||||
"""Connect to SQLite and return connection."""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
conn.row_factory = sqlite3.Row # Access columns by name
|
||||
return conn
|
||||
|
||||
if 'loaded_file' not in st.session_state:
|
||||
st.session_state.loaded_file = None
|
||||
def init_db():
|
||||
"""Initialize tables and migrate legacy JSON if needed."""
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
|
||||
if 'last_mtime' not in st.session_state:
|
||||
st.session_state.last_mtime = 0
|
||||
# Table 1: Single Tab Settings (Keyed by file path)
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS path_settings (
|
||||
path TEXT PRIMARY KEY,
|
||||
settings_json TEXT,
|
||||
updated_at TIMESTAMP
|
||||
)
|
||||
''')
|
||||
|
||||
if 'edit_history_idx' not in st.session_state:
|
||||
st.session_state.edit_history_idx = None
|
||||
|
||||
if 'single_editor_cache' not in st.session_state:
|
||||
st.session_state.single_editor_cache = DEFAULTS.copy()
|
||||
|
||||
if 'ui_reset_token' not in st.session_state:
|
||||
st.session_state.ui_reset_token = 0
|
||||
|
||||
# Track the active tab state for programmatic switching
|
||||
if 'active_tab_name' not in st.session_state:
|
||||
st.session_state.active_tab_name = "📝 Single Editor"
|
||||
|
||||
# ==========================================
|
||||
# 3. SIDEBAR (NAVIGATOR & TOOLS)
|
||||
# ==========================================
|
||||
with st.sidebar:
|
||||
st.header("📂 Navigator")
|
||||
# Table 2: Batch History (Logs of batch runs)
|
||||
c.execute('''
|
||||
CREATE TABLE IF NOT EXISTS batch_history (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TIMESTAMP,
|
||||
folder_path TEXT,
|
||||
log_data TEXT
|
||||
)
|
||||
''')
|
||||
|
||||
# --- Path Navigator ---
|
||||
new_path = st.text_input("Current Path", value=str(st.session_state.current_dir))
|
||||
if new_path != str(st.session_state.current_dir):
|
||||
p = Path(new_path)
|
||||
if p.exists() and p.is_dir():
|
||||
st.session_state.current_dir = p
|
||||
st.session_state.config['last_dir'] = str(p)
|
||||
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
|
||||
st.rerun()
|
||||
conn.commit()
|
||||
|
||||
# --- Favorites System ---
|
||||
if st.button("📌 Pin Current Folder"):
|
||||
if str(st.session_state.current_dir) not in st.session_state.config['favorites']:
|
||||
st.session_state.config['favorites'].append(str(st.session_state.current_dir))
|
||||
save_config(st.session_state.current_dir, st.session_state.config['favorites'])
|
||||
st.rerun()
|
||||
# --- MIGRATION LOGIC ---
|
||||
# Check if DB is empty
|
||||
c.execute("SELECT COUNT(*) FROM path_settings")
|
||||
count = c.fetchone()[0]
|
||||
|
||||
fav_selection = st.radio(
|
||||
"Jump to:",
|
||||
["Select..."] + st.session_state.config['favorites'],
|
||||
index=0,
|
||||
label_visibility="collapsed"
|
||||
)
|
||||
if fav_selection != "Select..." and fav_selection != str(st.session_state.current_dir):
|
||||
st.session_state.current_dir = Path(fav_selection)
|
||||
st.rerun()
|
||||
|
||||
st.markdown("---")
|
||||
|
||||
# --- Snippet Library ---
|
||||
st.subheader("🧩 Snippet Library")
|
||||
with st.expander("Add New Snippet"):
|
||||
snip_name = st.text_input("Name", placeholder="e.g. Cinematic")
|
||||
snip_content = st.text_area("Content", placeholder="4k, high quality...")
|
||||
if st.button("Save Snippet"):
|
||||
if snip_name and snip_content:
|
||||
st.session_state.snippets[snip_name] = snip_content
|
||||
save_snippets(st.session_state.snippets)
|
||||
st.success(f"Saved '{snip_name}'")
|
||||
st.rerun()
|
||||
|
||||
if st.session_state.snippets:
|
||||
st.caption("Click to Append to Prompt:")
|
||||
for name, content in st.session_state.snippets.items():
|
||||
col_s1, col_s2 = st.columns([4, 1])
|
||||
if col_s1.button(f"➕ {name}", use_container_width=True):
|
||||
st.session_state.append_prompt = content
|
||||
st.rerun()
|
||||
if col_s2.button("🗑️", key=f"del_snip_{name}"):
|
||||
del st.session_state.snippets[name]
|
||||
save_snippets(st.session_state.snippets)
|
||||
st.rerun()
|
||||
|
||||
st.markdown("---")
|
||||
|
||||
# --- File List & Creation ---
|
||||
json_files = sorted(list(st.session_state.current_dir.glob("*.json")))
|
||||
json_files = [f for f in json_files if f.name != ".editor_config.json" and f.name != ".editor_snippets.json"]
|
||||
|
||||
if not json_files:
|
||||
if st.button("Generate Templates"):
|
||||
generate_templates(st.session_state.current_dir)
|
||||
st.rerun()
|
||||
|
||||
with st.expander("Create New JSON"):
|
||||
new_filename = st.text_input("Filename", placeholder="my_prompt_vace")
|
||||
is_batch = st.checkbox("Is Batch File?")
|
||||
if st.button("Create"):
|
||||
if not new_filename.endswith(".json"): new_filename += ".json"
|
||||
path = st.session_state.current_dir / new_filename
|
||||
if is_batch:
|
||||
data = {"batch_data": []}
|
||||
else:
|
||||
data = DEFAULTS.copy()
|
||||
if "vace" in new_filename: data.update({"frame_to_skip": 81, "vace schedule": 1, "video file path": ""})
|
||||
elif "i2v" in new_filename: data.update({"reference image path": "", "flf image path": ""})
|
||||
save_json(path, data)
|
||||
st.rerun()
|
||||
|
||||
# --- File Selector ---
|
||||
if 'file_selector' not in st.session_state:
|
||||
st.session_state.file_selector = json_files[0].name if json_files else None
|
||||
if st.session_state.file_selector not in [f.name for f in json_files] and json_files:
|
||||
st.session_state.file_selector = json_files[0].name
|
||||
|
||||
selected_file_name = st.radio("Select File", [f.name for f in json_files], key="file_selector")
|
||||
|
||||
# ==========================================
|
||||
# 4. MAIN APP LOGIC
|
||||
# ==========================================
|
||||
if selected_file_name:
|
||||
file_path = st.session_state.current_dir / selected_file_name
|
||||
|
||||
# --- FILE LOADING & AUTO-SWITCH LOGIC ---
|
||||
if st.session_state.loaded_file != str(file_path):
|
||||
data, mtime = load_json(file_path)
|
||||
st.session_state.data_cache = data
|
||||
st.session_state.last_mtime = mtime
|
||||
st.session_state.loaded_file = str(file_path)
|
||||
|
||||
# Clear transient states
|
||||
if 'append_prompt' in st.session_state: del st.session_state.append_prompt
|
||||
if 'rand_seed' in st.session_state: del st.session_state.rand_seed
|
||||
if 'restored_indicator' in st.session_state: del st.session_state.restored_indicator
|
||||
st.session_state.edit_history_idx = None
|
||||
|
||||
# --- AUTO-SWITCH TAB LOGIC ---
|
||||
# If the file has 'batch_data' or is a list, force Batch tab.
|
||||
# Otherwise, force Single tab.
|
||||
is_batch = "batch_data" in data or isinstance(data, list)
|
||||
if is_batch:
|
||||
st.session_state.active_tab_name = "🚀 Batch Processor"
|
||||
else:
|
||||
st.session_state.active_tab_name = "📝 Single Editor"
|
||||
if count == 0 and os.path.exists(LEGACY_JSON_FILE):
|
||||
print(f"[DB] Database empty. Found {LEGACY_JSON_FILE}, migrating...")
|
||||
try:
|
||||
with open(LEGACY_JSON_FILE, 'r') as f:
|
||||
legacy_data = json.load(f)
|
||||
|
||||
# Assuming JSON structure: {"/path/to/img.png": {"threshold": 0.5, ...}}
|
||||
# If your JSON is flat, you might need to adjust this loop.
|
||||
if isinstance(legacy_data, dict):
|
||||
for path, data in legacy_data.items():
|
||||
c.execute(
|
||||
"INSERT OR IGNORE INTO path_settings (path, settings_json, updated_at) VALUES (?, ?, ?)",
|
||||
(path, json.dumps(data), datetime.now())
|
||||
)
|
||||
conn.commit()
|
||||
print("[DB] Migration complete.")
|
||||
except Exception as e:
|
||||
print(f"[DB] Migration failed: {e}")
|
||||
|
||||
conn.close()
|
||||
|
||||
# --- DB Helper Functions ---
|
||||
|
||||
def load_settings(path):
|
||||
"""Load settings for a specific path from DB."""
|
||||
if not path:
|
||||
return None
|
||||
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT settings_json FROM path_settings WHERE path = ?", (path,))
|
||||
row = c.fetchone()
|
||||
conn.close()
|
||||
|
||||
if row:
|
||||
return json.loads(row['settings_json'])
|
||||
return None
|
||||
|
||||
def save_settings(path, settings_dict):
|
||||
"""Save/Update settings for a specific path."""
|
||||
if not path:
|
||||
return
|
||||
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
json_str = json.dumps(settings_dict)
|
||||
|
||||
# Upsert: Insert, or Update if path exists
|
||||
c.execute('''
|
||||
INSERT INTO path_settings (path, settings_json, updated_at)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(path) DO UPDATE SET
|
||||
settings_json=excluded.settings_json,
|
||||
updated_at=excluded.updated_at
|
||||
''', (path, json_str, datetime.now()))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
print(f"[DB] Saved settings for {path}")
|
||||
|
||||
def save_batch_log(folder_path, logs):
|
||||
"""Save batch run details to DB."""
|
||||
conn = get_db()
|
||||
c = conn.cursor()
|
||||
c.execute('''
|
||||
INSERT INTO batch_history (timestamp, folder_path, log_data)
|
||||
VALUES (?, ?, ?)
|
||||
''', (datetime.now(), folder_path, json.dumps(logs)))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Initialize DB on startup
|
||||
init_db()
|
||||
|
||||
|
||||
# ==========================================
|
||||
# 2. APP LOGIC
|
||||
# ==========================================
|
||||
|
||||
# Default settings to fall back on
|
||||
DEFAULTS = {
|
||||
"threshold": 0.5,
|
||||
"invert": False,
|
||||
"scale": 1.0
|
||||
}
|
||||
|
||||
def on_path_load(path):
|
||||
"""
|
||||
Triggered when user enters a path or clicks Load.
|
||||
Fetches specific settings for this path from DB.
|
||||
"""
|
||||
data = load_settings(path)
|
||||
if data:
|
||||
msg = f"Loaded saved settings for: {os.path.basename(path)}"
|
||||
# Return values to update UI components
|
||||
return msg, data.get("threshold", DEFAULTS["threshold"]), data.get("invert", DEFAULTS["invert"]), data.get("scale", DEFAULTS["scale"])
|
||||
else:
|
||||
data = st.session_state.data_cache
|
||||
msg = "No saved settings found (using defaults)"
|
||||
return msg, DEFAULTS["threshold"], DEFAULTS["invert"], DEFAULTS["scale"]
|
||||
|
||||
st.title(f"Editing: {selected_file_name}")
|
||||
def process_single_image(path, threshold, invert, scale):
|
||||
"""
|
||||
Simulates processing.
|
||||
Critically: SAVES the settings used to the DB.
|
||||
"""
|
||||
if not path:
|
||||
return "Error: No path provided"
|
||||
|
||||
# --- CONTROLLED NAVIGATION (REPLACES ST.TABS) ---
|
||||
# Using radio buttons allows us to change 'active_tab_name' programmatically above.
|
||||
tabs_list = [
|
||||
"📝 Single Editor",
|
||||
"🚀 Batch Processor",
|
||||
"🕒 Timeline",
|
||||
"🧪 Interactive Timeline",
|
||||
"🔌 Comfy Monitor"
|
||||
]
|
||||
# 1. Save these settings to DB so they are remembered next time
|
||||
current_settings = {
|
||||
"threshold": threshold,
|
||||
"invert": invert,
|
||||
"scale": scale
|
||||
}
|
||||
save_settings(path, current_settings)
|
||||
|
||||
# 2. Run actual processing (Place your real code here)
|
||||
time.sleep(0.5) # Simulate work
|
||||
return f"Success! Processed {os.path.basename(path)} with Threshold={threshold}. Settings Saved."
|
||||
|
||||
def process_batch(folder_path, threshold):
|
||||
"""
|
||||
Simulates batch processing.
|
||||
Logs the result to the 'batch_history' table.
|
||||
"""
|
||||
if not os.path.isdir(folder_path):
|
||||
return "Error: Invalid folder path"
|
||||
|
||||
files = [f for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg'))]
|
||||
log_messages = []
|
||||
|
||||
for f in files:
|
||||
# Simulate processing each file
|
||||
# You could also load individual file settings here if needed
|
||||
log_messages.append(f"Processed {f} (Threshold: {threshold})")
|
||||
|
||||
# Ensure active tab is valid (safety check)
|
||||
if st.session_state.active_tab_name not in tabs_list:
|
||||
st.session_state.active_tab_name = tabs_list[0]
|
||||
|
||||
current_tab = st.radio(
|
||||
"Navigation",
|
||||
tabs_list,
|
||||
key="active_tab_name", # Binds to session state
|
||||
horizontal=True,
|
||||
label_visibility="collapsed"
|
||||
)
|
||||
result_text = "\n".join(log_messages)
|
||||
|
||||
st.markdown("---")
|
||||
# Save to Batch DB
|
||||
save_batch_log(folder_path, log_messages)
|
||||
|
||||
return f"Batch Complete. {len(files)} files processed.\n\nLOG:\n{result_text}"
|
||||
|
||||
# --- RENDER SELECTED TAB ---
|
||||
if current_tab == "📝 Single Editor":
|
||||
render_single_editor(data, file_path)
|
||||
|
||||
elif current_tab == "🚀 Batch Processor":
|
||||
render_batch_processor(data, file_path, json_files, st.session_state.current_dir, selected_file_name)
|
||||
|
||||
elif current_tab == "🕒 Timeline":
|
||||
render_timeline_tab(data, file_path)
|
||||
|
||||
elif current_tab == "🧪 Interactive Timeline":
|
||||
render_timeline_wip(data, file_path)
|
||||
|
||||
elif current_tab == "🔌 Comfy Monitor":
|
||||
render_comfy_monitor()
|
||||
# ==========================================
|
||||
# 3. GRADIO INTERFACE
|
||||
# ==========================================
|
||||
|
||||
with gr.Blocks(title="App with DB Persistence") as app:
|
||||
gr.Markdown("## Image Processor (SQLite Powered)")
|
||||
|
||||
with gr.Tabs():
|
||||
|
||||
# --- TAB 1: SINGLE IMAGE ---
|
||||
with gr.Tab("Single Image"):
|
||||
gr.Markdown("Settings are saved automatically per file path.")
|
||||
|
||||
with gr.Row():
|
||||
# The 'Key' for our database
|
||||
path_input = gr.Textbox(label="Image File Path", placeholder="C:/Images/photo1.png", scale=3)
|
||||
load_btn = gr.Button("Load Settings", scale=1)
|
||||
|
||||
with gr.Row():
|
||||
# Settings Inputs
|
||||
thresh_slider = gr.Slider(0.0, 1.0, value=DEFAULTS["threshold"], label="Threshold")
|
||||
scale_num = gr.Number(value=DEFAULTS["scale"], label="Scale Factor")
|
||||
invert_chk = gr.Checkbox(value=DEFAULTS["invert"], label="Invert Colors")
|
||||
|
||||
status_output = gr.Textbox(label="Status / Output", lines=2)
|
||||
run_btn = gr.Button("Process & Save", variant="primary")
|
||||
|
||||
# Interactions
|
||||
# 1. Loading settings when button clicked
|
||||
load_btn.click(
|
||||
fn=on_path_load,
|
||||
inputs=[path_input],
|
||||
outputs=[status_output, thresh_slider, invert_chk, scale_num]
|
||||
)
|
||||
|
||||
# 2. (Optional) Auto-load when path input loses focus (blur)
|
||||
path_input.blur(
|
||||
fn=on_path_load,
|
||||
inputs=[path_input],
|
||||
outputs=[status_output, thresh_slider, invert_chk, scale_num]
|
||||
)
|
||||
|
||||
# 3. Processing
|
||||
run_btn.click(
|
||||
fn=process_single_image,
|
||||
inputs=[path_input, thresh_slider, invert_chk, scale_num],
|
||||
outputs=[status_output]
|
||||
)
|
||||
|
||||
# --- TAB 2: BATCH PROCESS ---
|
||||
with gr.Tab("Batch Processing"):
|
||||
gr.Markdown("Batch runs are logged to history.")
|
||||
|
||||
batch_input = gr.Textbox(label="Input Folder", placeholder="C:/Images/Batch_Folder")
|
||||
|
||||
# Example: Global override for batch, or you could load per file
|
||||
batch_thresh = gr.Slider(0.0, 1.0, value=0.5, label="Global Threshold Override")
|
||||
|
||||
batch_run_btn = gr.Button("Run Batch", variant="primary")
|
||||
batch_output = gr.TextArea(label="Batch Log", lines=10)
|
||||
|
||||
batch_run_btn.click(
|
||||
fn=process_batch,
|
||||
inputs=[batch_input, batch_thresh],
|
||||
outputs=[batch_output]
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.launch()
|
||||
|
||||
Reference in New Issue
Block a user