Update engine.py

This commit is contained in:
2026-01-17 15:27:15 +01:00
parent ac7566d730
commit e1d9b3cd7a

239
engine.py
View File

@@ -1,40 +1,54 @@
import os import os
import shutil import shutil
import sqlite3 import sqlite3
import json
from PIL import Image from PIL import Image
from io import BytesIO from io import BytesIO
class SorterEngine: class SorterEngine:
# Path to the SQLite database file stored in the app volume
DB_PATH = "/app/sorter_database.db" DB_PATH = "/app/sorter_database.db"
# --- DATABASE INITIALIZATION ---
@staticmethod @staticmethod
def init_db(): def init_db():
"""Initializes SQLite tables for Profiles, Folder IDs, and Categories."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
# Profiles now store independent paths for each functional area
# Profile table supports independent paths for each tab
cursor.execute('''CREATE TABLE IF NOT EXISTS profiles cursor.execute('''CREATE TABLE IF NOT EXISTS profiles
(name TEXT PRIMARY KEY, (name TEXT PRIMARY KEY,
tab1_target TEXT, tab1_target TEXT,
tab2_target TEXT, tab2_control TEXT, tab2_target TEXT, tab2_control TEXT,
tab4_source TEXT, tab4_out TEXT)''') tab4_source TEXT, tab4_out TEXT,
mode TEXT)''')
# Maps source paths to persistent numeric IDs
cursor.execute('''CREATE TABLE IF NOT EXISTS folder_ids (path TEXT PRIMARY KEY, folder_id INTEGER)''') cursor.execute('''CREATE TABLE IF NOT EXISTS folder_ids (path TEXT PRIMARY KEY, folder_id INTEGER)''')
# Stores sorting subfolder names for Tab 4
cursor.execute('''CREATE TABLE IF NOT EXISTS categories (name TEXT PRIMARY KEY)''') cursor.execute('''CREATE TABLE IF NOT EXISTS categories (name TEXT PRIMARY KEY)''')
# Seed default categories from the original script if empty
cursor.execute("SELECT COUNT(*) FROM categories")
if cursor.fetchone()[0] == 0:
for cat in ["_TRASH", "Default", "Action", "Solo"]:
cursor.execute("INSERT INTO categories VALUES (?)", (cat,))
conn.commit() conn.commit()
conn.close() conn.close()
# --- PROFILE & PATH MANAGEMENT ---
@staticmethod @staticmethod
def save_tab_paths(profile_name, t1_t=None, t2_t=None, t2_c=None, t4_s=None, t4_o=None): def save_tab_paths(profile_name, t1_t=None, t2_t=None, t2_c=None, t4_s=None, t4_o=None, mode=None):
"""Updates only the paths provided, preserving others in the profile.""" """Updates specific tab paths while preserving others in the DB."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
# Fetch existing to avoid overwriting with None
cursor.execute("SELECT * FROM profiles WHERE name = ?", (profile_name,)) cursor.execute("SELECT * FROM profiles WHERE name = ?", (profile_name,))
row = cursor.fetchone() row = cursor.fetchone()
if not row: if not row:
row = (profile_name, "", "", "", "", "") row = (profile_name, "/storage", "/storage", "/storage", "/storage", "/storage", "id")
new_values = ( new_values = (
profile_name, profile_name,
@@ -42,53 +56,38 @@ class SorterEngine:
t2_t if t2_t is not None else row[2], t2_t if t2_t is not None else row[2],
t2_c if t2_c is not None else row[3], t2_c if t2_c is not None else row[3],
t4_s if t4_s is not None else row[4], t4_s if t4_s is not None else row[4],
t4_o if t4_o is not None else row[5] t4_o if t4_o is not None else row[5],
mode if mode is not None else row[6]
) )
cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?)", new_values)
cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?, ?)", new_values)
conn.commit() conn.commit()
conn.close() conn.close()
@staticmethod @staticmethod
def get_images(path): def load_profiles():
"""Standard image scanner for directories.""" """Loads all workspace presets from the database."""
exts = ('.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff') conn = sqlite3.connect(SorterEngine.DB_PATH)
if not path or not os.path.exists(path): cursor = conn.cursor()
return [] cursor.execute("SELECT * FROM profiles")
return sorted([f for f in os.listdir(path) if f.lower().endswith(exts)]) rows = cursor.fetchall()
conn.close()
return {r[0]: {"tab1_target": r[1], "tab2_target": r[2], "tab2_control": r[3],
"tab4_source": r[4], "tab4_out": r[5], "mode": r[6]} for r in rows}
@staticmethod @staticmethod
def get_id_mapping(path): def delete_profile(name):
"""Maps idXXX prefixes to lists of filenames to detect collisions.""" """Removes a workspace profile."""
mapping = {} conn = sqlite3.connect(SorterEngine.DB_PATH)
for f in SorterEngine.get_images(path): cursor = conn.cursor()
if f.startswith("id") and "_" in f: cursor.execute("DELETE FROM profiles WHERE name = ?", (name,))
prefix = f.split('_')[0] conn.commit()
if prefix not in mapping: conn.close()
mapping[prefix] = []
mapping[prefix].append(f)
return mapping
@staticmethod
def get_max_id_number(target_path):
"""Scans directories to find the highest existing ID prefix."""
max_id = 0
search_paths = [target_path, os.path.join(target_path, "selected_target")]
for p in search_paths:
if not os.path.exists(p):
continue
for f in os.listdir(p):
if f.startswith("id") and "_" in f:
try:
num = int(f[2:].split('_')[0])
if num > max_id:
max_id = num
except:
continue
return max_id
# --- FOLDER ID & CATEGORY LOGIC ---
@staticmethod @staticmethod
def get_folder_id(source_path): def get_folder_id(source_path):
"""Retrieves or generates a unique, persistent ID for a specific folder.""" """Retrieves or generates a persistent ID for a specific folder."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT folder_id FROM folder_ids WHERE path = ?", (source_path,)) cursor.execute("SELECT folder_id FROM folder_ids WHERE path = ?", (source_path,))
@@ -102,28 +101,63 @@ class SorterEngine:
fid = (row[0] + 1) if row and row[0] else 1 fid = (row[0] + 1) if row and row[0] else 1
cursor.execute("INSERT INTO folder_ids VALUES (?, ?)", (source_path, fid)) cursor.execute("INSERT INTO folder_ids VALUES (?, ?)", (source_path, fid))
conn.commit() conn.commit()
conn.close() conn.close()
return fid return fid
@staticmethod
def get_categories():
"""Fetches sorting categories for Tab 4 buttons."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM categories ORDER BY name ASC")
cats = [r[0] for r in cursor.fetchall()]
conn.close()
return cats
@staticmethod
def add_category(name):
"""Adds a new persistent category to the SQL database."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (name,))
conn.commit()
conn.close()
# --- FILE & IMAGE OPERATIONS ---
@staticmethod
def get_images(path):
"""Standard image scanner for directories."""
exts = ('.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff')
if not path or not os.path.exists(path): return []
return sorted([f for f in os.listdir(path) if f.lower().endswith(exts)])
@staticmethod
def get_max_id_number(target_path):
"""Finds the highest idXXX_ prefix in a directory."""
max_id = 0
if not target_path or not os.path.exists(target_path): return 0
for f in os.listdir(target_path):
if f.startswith("id") and "_" in f:
try:
num = int(f[2:].split('_')[0])
if num > max_id: max_id = num
except: continue
return max_id
@staticmethod @staticmethod
def harmonize_names(t_p, c_p): def harmonize_names(t_p, c_p):
"""Renames the control file to match the target filename exactly.""" """Forces the control file to match the target name."""
t_name = os.path.basename(t_p) t_name = os.path.basename(t_p)
c_dir = os.path.dirname(c_p) new_c_p = os.path.join(os.path.dirname(c_p), t_name)
new_c_p = os.path.join(c_dir, t_name)
# Handle filename collisions during harmonization
if os.path.exists(new_c_p) and c_p != new_c_p: if os.path.exists(new_c_p) and c_p != new_c_p:
base, ext = os.path.splitext(t_name) root, ext = os.path.splitext(t_name)
new_c_p = os.path.join(c_dir, f"{base}_alt{ext}") new_c_p = os.path.join(os.path.dirname(c_p), f"{root}_alt{ext}")
os.rename(c_p, new_c_p) os.rename(c_p, new_c_p)
return new_c_p return new_c_p
@staticmethod @staticmethod
def re_id_file(old_path, new_id_prefix): def re_id_file(old_path, new_id_prefix):
"""Changes the idXXX_ portion of a filename to resolve collisions.""" """Changes the idXXX_ prefix to resolve collisions."""
dir_name = os.path.dirname(old_path) dir_name = os.path.dirname(old_path)
old_name = os.path.basename(old_path) old_name = os.path.basename(old_path)
name_no_id = old_name.split('_', 1)[1] if '_' in old_name else old_name name_no_id = old_name.split('_', 1)[1] if '_' in old_name else old_name
@@ -134,122 +168,43 @@ class SorterEngine:
@staticmethod @staticmethod
def move_to_unused_synced(t_p, c_p, t_root, c_root): def move_to_unused_synced(t_p, c_p, t_root, c_root):
"""Moves a pair to 'unused' folders with synchronized filenames.""" """Moves a pair to 'unused' subfolders with matched names."""
t_name = os.path.basename(t_p) t_name = os.path.basename(t_p)
t_un = os.path.join(t_root, "unused", t_name) t_un = os.path.join(t_root, "unused", t_name)
c_un = os.path.join(c_root, "unused", t_name) c_un = os.path.join(c_root, "unused", t_name)
os.makedirs(os.path.dirname(t_un), exist_ok=True) os.makedirs(os.path.dirname(t_un), exist_ok=True)
os.makedirs(os.path.dirname(c_un), exist_ok=True) os.makedirs(os.path.dirname(c_un), exist_ok=True)
shutil.move(t_p, t_un) shutil.move(t_p, t_un)
shutil.move(c_p, c_un) shutil.move(c_p, c_un)
return t_un, c_un return t_un, c_un
@staticmethod @staticmethod
def restore_from_unused(t_p, c_p, t_root, c_root): def restore_from_unused(t_p, c_p, t_root, c_root):
"""Restores a pair from 'unused' back to main 'selected' folders.""" """Moves files back from 'unused' to main folders."""
t_name = os.path.basename(t_p) t_name = os.path.basename(t_p)
t_dst = os.path.join(t_root, "selected_target", t_name) t_dst = os.path.join(t_root, "selected_target", t_name)
c_dst = os.path.join(c_root, "selected_control", t_name) c_dst = os.path.join(c_root, "selected_control", t_name)
os.makedirs(os.path.dirname(t_dst), exist_ok=True) os.makedirs(os.path.dirname(t_dst), exist_ok=True)
os.makedirs(os.path.dirname(c_dst), exist_ok=True) os.makedirs(os.path.dirname(c_dst), exist_ok=True)
shutil.move(t_p, t_dst) shutil.move(t_p, t_dst)
shutil.move(c_p, c_dst) shutil.move(c_p, c_dst)
return t_dst, c_dst return t_dst, c_dst
@staticmethod
def execute_match(t_path, c_path, target_root, prefix, mode="standard"):
"""Standard matching: moves target and copies control with synced names."""
target_base = os.path.basename(t_path)
new_name = f"{prefix}{target_base}"
folders = {
"standard": ("selected_target", "selected_control"),
"solo": ("selected_target_solo_woman", "control_selected_solo_woman")
}
t_sub, c_sub = folders[mode]
t_dst = os.path.join(target_root, t_sub, new_name)
c_dst = os.path.join(target_root, c_sub, new_name)
os.makedirs(os.path.dirname(t_dst), exist_ok=True)
os.makedirs(os.path.dirname(c_dst), exist_ok=True)
shutil.move(t_path, t_dst)
shutil.copy2(c_path, c_dst)
return t_dst, c_dst
@staticmethod
def get_categories():
"""Retrieves categories from the database for Tab 4."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM categories ORDER BY name ASC")
cats = [r[0] for r in cursor.fetchall()]
conn.close()
return cats
@staticmethod
def add_category(name):
"""Adds a new category button to the database."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (name,))
conn.commit()
conn.close()
@staticmethod
def save_profile(name, disc_t, rev_t, rev_c, path_out, mode):
"""Saves a multi-path preset to the database."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?)",
(name, disc_t, rev_t, rev_c, path_out, mode))
conn.commit()
conn.close()
@staticmethod
def load_profiles():
"""Returns all stored profiles."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM profiles")
rows = cursor.fetchall()
conn.close()
return {r[0]: {"disc_t": r[1], "rev_t": r[2], "rev_c": r[3], "path_out": r[4], "mode": r[5]} for r in rows}
@staticmethod
def delete_profile(name):
"""Removes a profile from the database."""
conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM profiles WHERE name = ?", (name,))
conn.commit()
conn.close()
@staticmethod @staticmethod
def compress_for_web(path, quality): def compress_for_web(path, quality):
"""Compresses images into JPEG format for fast UI rendering.""" """Converts images to compressed JPEGs for the UI."""
try: try:
with Image.open(path) as img: with Image.open(path) as img:
buf = BytesIO() buf = BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=quality) img.convert("RGB").save(buf, format="JPEG", quality=quality)
return buf return buf
except: except: return None
return None
@staticmethod @staticmethod
def revert_action(action): def revert_action(action):
"""Undoes the previous file operation.""" """Undoes the last move operation."""
if action['type'] in ['link_standard', 'link_solo', 'unused', 'cat_move', 'move']: if action['type'] in ['link_standard', 'link_solo', 'unused', 'cat_move', 'move']:
if os.path.exists(action['t_dst']): if os.path.exists(action['t_dst']): shutil.move(action['t_dst'], action['t_src'])
shutil.move(action['t_dst'], action['t_src'])
if 'c_dst' in action and os.path.exists(action['c_dst']): if 'c_dst' in action and os.path.exists(action['c_dst']):
# Remove copies created during matching, restore moves if 'link' in action['type']: os.remove(action['c_dst'])
if 'link' in action['type']: else: shutil.move(action['c_dst'], action['c_src'])
os.remove(action['c_dst'])
else:
shutil.move(action['c_dst'], action['c_src'])