Update engine.py

This commit is contained in:
2026-01-17 15:15:20 +01:00
parent 5485a047c5
commit e7c71b927b

177
engine.py
View File

@@ -1,40 +1,81 @@
import os import os
import shutil import shutil
import sqlite3 import sqlite3
import json
from PIL import Image from PIL import Image
from io import BytesIO from io import BytesIO
class SorterEngine: class SorterEngine:
# Path to the SQLite database file stored in the app volume
DB_PATH = "/app/sorter_database.db" DB_PATH = "/app/sorter_database.db"
@staticmethod @staticmethod
def init_db(): def init_db():
"""Initializes SQLite tables for Profiles, Folder IDs, and Categories.""" """Initializes the SQLite database and creates all required tables."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
# Table 1: Profiles - Stores all discovery, review, and output paths # Table for Profiles: Stores paths for Discovery, Review, and Output
cursor.execute('''CREATE TABLE IF NOT EXISTS profiles cursor.execute('''CREATE TABLE IF NOT EXISTS profiles
(name TEXT PRIMARY KEY, disc_t TEXT, rev_t TEXT, rev_c TEXT, path_out TEXT, mode TEXT)''') (name TEXT PRIMARY KEY, disc_t TEXT, rev_t TEXT, rev_c TEXT, path_out TEXT, mode TEXT)''')
# Table 2: Folder IDs - Maps source paths to unique numeric IDs for renaming # Table for Folder IDs: Maps source folder paths to persistent numeric IDs
cursor.execute('''CREATE TABLE IF NOT EXISTS folder_ids (path TEXT PRIMARY KEY, folder_id INTEGER)''') cursor.execute('''CREATE TABLE IF NOT EXISTS folder_ids (path TEXT PRIMARY KEY, folder_id INTEGER)''')
# Table 3: Categories - Stores the sorting buttons for Tab 4 # Table for Categories: Stores sorting subfolder names for Tab 4
cursor.execute('''CREATE TABLE IF NOT EXISTS categories (name TEXT PRIMARY KEY)''') cursor.execute('''CREATE TABLE IF NOT EXISTS categories (name TEXT PRIMARY KEY)''')
# Seed default categories if the table is new # Seed default categories if the table is empty
cursor.execute("SELECT COUNT(*) FROM categories") cursor.execute("SELECT COUNT(*) FROM categories")
if cursor.fetchone()[0] == 0: if cursor.fetchone()[0] == 0:
for cat in ["_TRASH", "Default", "Action", "Solo"]: defaults = ["_TRASH", "Default", "Action", "Solo"]
for cat in defaults:
cursor.execute("INSERT INTO categories VALUES (?)", (cat,)) cursor.execute("INSERT INTO categories VALUES (?)", (cat,))
conn.commit() conn.commit()
conn.close() conn.close()
@staticmethod
def get_images(path):
"""Standard image scanner for directories."""
exts = ('.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff')
if not path or not os.path.exists(path):
return []
return sorted([f for f in os.listdir(path) if f.lower().endswith(exts)])
@staticmethod
def get_id_mapping(path):
"""Maps idXXX prefixes to lists of filenames to detect collisions."""
mapping = {}
for f in SorterEngine.get_images(path):
if f.startswith("id") and "_" in f:
prefix = f.split('_')[0]
if prefix not in mapping:
mapping[prefix] = []
mapping[prefix].append(f)
return mapping
@staticmethod
def get_max_id_number(target_path):
"""Scans directories to find the highest existing ID prefix."""
max_id = 0
search_paths = [target_path, os.path.join(target_path, "selected_target")]
for p in search_paths:
if not os.path.exists(p):
continue
for f in os.listdir(p):
if f.startswith("id") and "_" in f:
try:
num = int(f[2:].split('_')[0])
if num > max_id:
max_id = num
except:
continue
return max_id
@staticmethod @staticmethod
def get_folder_id(source_path): def get_folder_id(source_path):
"""Retrieves or generates a persistent unique ID for a folder.""" """Retrieves or generates a unique, persistent ID for a specific folder."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT folder_id FROM folder_ids WHERE path = ?", (source_path,)) cursor.execute("SELECT folder_id FROM folder_ids WHERE path = ?", (source_path,))
@@ -52,9 +93,85 @@ class SorterEngine:
conn.close() conn.close()
return fid return fid
@staticmethod
def harmonize_names(t_p, c_p):
"""Renames the control file to match the target filename exactly."""
t_name = os.path.basename(t_p)
c_dir = os.path.dirname(c_p)
new_c_p = os.path.join(c_dir, t_name)
# Handle filename collisions during harmonization
if os.path.exists(new_c_p) and c_p != new_c_p:
base, ext = os.path.splitext(t_name)
new_c_p = os.path.join(c_dir, f"{base}_alt{ext}")
os.rename(c_p, new_c_p)
return new_c_p
@staticmethod
def re_id_file(old_path, new_id_prefix):
"""Changes the idXXX_ portion of a filename to resolve collisions."""
dir_name = os.path.dirname(old_path)
old_name = os.path.basename(old_path)
name_no_id = old_name.split('_', 1)[1] if '_' in old_name else old_name
new_name = f"{new_id_prefix}{name_no_id}"
new_path = os.path.join(dir_name, new_name)
os.rename(old_path, new_path)
return new_path
@staticmethod
def move_to_unused_synced(t_p, c_p, t_root, c_root):
"""Moves a pair to 'unused' folders with synchronized filenames."""
t_name = os.path.basename(t_p)
t_un = os.path.join(t_root, "unused", t_name)
c_un = os.path.join(c_root, "unused", t_name)
os.makedirs(os.path.dirname(t_un), exist_ok=True)
os.makedirs(os.path.dirname(c_un), exist_ok=True)
shutil.move(t_p, t_un)
shutil.move(c_p, c_un)
return t_un, c_un
@staticmethod
def restore_from_unused(t_p, c_p, t_root, c_root):
"""Restores a pair from 'unused' back to main 'selected' folders."""
t_name = os.path.basename(t_p)
t_dst = os.path.join(t_root, "selected_target", t_name)
c_dst = os.path.join(c_root, "selected_control", t_name)
os.makedirs(os.path.dirname(t_dst), exist_ok=True)
os.makedirs(os.path.dirname(c_dst), exist_ok=True)
shutil.move(t_p, t_dst)
shutil.move(c_p, c_dst)
return t_dst, c_dst
@staticmethod
def execute_match(t_path, c_path, target_root, prefix, mode="standard"):
"""Standard matching: moves target and copies control with synced names."""
target_base = os.path.basename(t_path)
new_name = f"{prefix}{target_base}"
folders = {
"standard": ("selected_target", "selected_control"),
"solo": ("selected_target_solo_woman", "control_selected_solo_woman")
}
t_sub, c_sub = folders[mode]
t_dst = os.path.join(target_root, t_sub, new_name)
c_dst = os.path.join(target_root, c_sub, new_name)
os.makedirs(os.path.dirname(t_dst), exist_ok=True)
os.makedirs(os.path.dirname(c_dst), exist_ok=True)
shutil.move(t_path, t_dst)
shutil.copy2(c_path, c_dst)
return t_dst, c_dst
@staticmethod @staticmethod
def get_categories(): def get_categories():
"""Fetches the category list for dynamic button rendering.""" """Retrieves categories from the database for Tab 4."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT name FROM categories ORDER BY name ASC") cursor.execute("SELECT name FROM categories ORDER BY name ASC")
@@ -64,7 +181,7 @@ class SorterEngine:
@staticmethod @staticmethod
def add_category(name): def add_category(name):
"""Adds a new persistent category to the database.""" """Adds a new category button to the database."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (name,)) cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (name,))
@@ -73,7 +190,7 @@ class SorterEngine:
@staticmethod @staticmethod
def save_profile(name, disc_t, rev_t, rev_c, path_out, mode): def save_profile(name, disc_t, rev_t, rev_c, path_out, mode):
"""Persists the entire path configuration to the database.""" """Saves a multi-path preset to the database."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?)", cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?)",
@@ -83,7 +200,7 @@ class SorterEngine:
@staticmethod @staticmethod
def load_profiles(): def load_profiles():
"""Loads all saved path presets.""" """Returns all stored profiles."""
conn = sqlite3.connect(SorterEngine.DB_PATH) conn = sqlite3.connect(SorterEngine.DB_PATH)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT * FROM profiles") cursor.execute("SELECT * FROM profiles")
@@ -100,40 +217,26 @@ class SorterEngine:
conn.commit() conn.commit()
conn.close() conn.close()
# --- File Operations ---
@staticmethod
def get_images(path):
"""Standard image scanning for directories."""
exts = ('.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff')
if not path or not os.path.exists(path): return []
return sorted([f for f in os.listdir(path) if f.lower().endswith(exts)])
@staticmethod
def harmonize_names(t_p, c_p):
"""Forces the control file to match the target name to fix ID collisions."""
t_name = os.path.basename(t_p)
new_c_p = os.path.join(os.path.dirname(c_p), t_name)
if os.path.exists(new_c_p) and c_p != new_c_p:
root, ext = os.path.splitext(t_name)
new_c_p = os.path.join(os.path.dirname(c_p), f"{root}_alt{ext}")
os.rename(c_p, new_c_p)
return new_c_p
@staticmethod @staticmethod
def compress_for_web(path, quality): def compress_for_web(path, quality):
"""Converts raw images to compressed JPEGs for fast browser loading.""" """Compresses images into JPEG format for fast UI rendering."""
try: try:
with Image.open(path) as img: with Image.open(path) as img:
buf = BytesIO() buf = BytesIO()
img.convert("RGB").save(buf, format="JPEG", quality=quality) img.convert("RGB").save(buf, format="JPEG", quality=quality)
return buf return buf
except: return None except:
return None
@staticmethod @staticmethod
def revert_action(action): def revert_action(action):
"""Undoes the last move operation by reversing source and destination.""" """Undoes the previous file operation."""
if action['type'] in ['link_standard', 'link_solo', 'unused', 'cat_move']: if action['type'] in ['link_standard', 'link_solo', 'unused', 'cat_move', 'move']:
if os.path.exists(action['t_dst']): shutil.move(action['t_dst'], action['t_src']) if os.path.exists(action['t_dst']):
shutil.move(action['t_dst'], action['t_src'])
if 'c_dst' in action and os.path.exists(action['c_dst']): if 'c_dst' in action and os.path.exists(action['c_dst']):
if 'link' in action['type']: os.remove(action['c_dst']) # Remove copies created during matching, restore moves
else: shutil.move(action['c_dst'], action['c_src']) if 'link' in action['type']:
os.remove(action['c_dst'])
else:
shutil.move(action['c_dst'], action['c_src'])