import os import shutil import sqlite3 import threading import hashlib from contextlib import contextmanager from functools import lru_cache from PIL import Image from io import BytesIO from typing import Dict, List, Optional, Tuple import concurrent.futures class SorterEngine: DB_PATH = "/app/sorter_database.db" _local = threading.local() # ========================================== # DATABASE CONNECTION MANAGEMENT (OPTIMIZED) # ========================================== @classmethod @contextmanager def get_connection(cls): """Thread-safe connection context manager.""" conn = sqlite3.connect(cls.DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() @classmethod @contextmanager def transaction(cls): """Execute multiple operations in a single transaction.""" with cls.get_connection() as conn: try: yield conn conn.commit() except Exception: conn.rollback() raise # ========================================== # DATABASE INITIALIZATION # ========================================== @staticmethod def init_db(): """Initializes tables, including the HISTORY log.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS profiles (name TEXT PRIMARY KEY, tab1_target TEXT, tab2_target TEXT, tab2_control TEXT, tab4_source TEXT, tab4_out TEXT, mode TEXT, tab5_source TEXT, tab5_out TEXT)''') cursor.execute('''CREATE TABLE IF NOT EXISTS folder_ids (path TEXT PRIMARY KEY, folder_id INTEGER)''') cursor.execute('''CREATE TABLE IF NOT EXISTS categories (name TEXT PRIMARY KEY)''') cursor.execute('''CREATE TABLE IF NOT EXISTS staging_area (original_path TEXT PRIMARY KEY, target_category TEXT, new_name TEXT, is_marked INTEGER DEFAULT 0)''') cursor.execute('''CREATE TABLE IF NOT EXISTS processed_log (source_path TEXT PRIMARY KEY, category TEXT, action_type TEXT)''') # Seed categories if empty cursor.execute("SELECT COUNT(*) FROM categories") if cursor.fetchone()[0] == 0: cursor.executemany( "INSERT OR IGNORE INTO categories VALUES (?)", [("_TRASH",), ("Default",), ("Action",), ("Solo",)] ) conn.commit() # ========================================== # PROFILE & PATH MANAGEMENT (OPTIMIZED) # ========================================== @staticmethod def save_tab_paths(profile_name, t1_t=None, t2_t=None, t2_c=None, t4_s=None, t4_o=None, mode=None, t5_s=None, t5_o=None): """Updates specific tab paths in the database while preserving others.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM profiles WHERE name = ?", (profile_name,)) row = cursor.fetchone() if not row: row = (profile_name, "/storage", "/storage", "/storage", "/storage", "/storage", "id", "/storage", "/storage") new_values = ( profile_name, t1_t if t1_t is not None else row[1], t2_t if t2_t is not None else row[2], t2_c if t2_c is not None else row[3], t4_s if t4_s is not None else row[4], t4_o if t4_o is not None else row[5], mode if mode is not None else row[6], t5_s if t5_s is not None else row[7], t5_o if t5_o is not None else row[8] ) cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", new_values) conn.commit() @staticmethod def load_profiles(): """Loads all workspace presets.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM profiles") rows = cursor.fetchall() return {r[0]: { "tab1_target": r[1], "tab2_target": r[2], "tab2_control": r[3], "tab4_source": r[4], "tab4_out": r[5], "mode": r[6], "tab5_source": r[7], "tab5_out": r[8] } for r in rows} # ========================================== # CATEGORY MANAGEMENT (OPTIMIZED) # ========================================== @staticmethod def get_categories() -> List[str]: with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT name FROM categories ORDER BY name COLLATE NOCASE ASC") return [r[0] for r in cursor.fetchall()] @staticmethod def add_category(name: str): with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (name,)) conn.commit() @staticmethod def rename_category(old_name: str, new_name: str, output_base_path: str = None): """Renames category in DB and optionally renames the physical folder.""" with SorterEngine.transaction() as conn: cursor = conn.cursor() cursor.execute("UPDATE categories SET name = ? WHERE name = ?", (new_name, old_name)) cursor.execute("UPDATE staging_area SET target_category = ? WHERE target_category = ?", (new_name, old_name)) if output_base_path: old_path = os.path.join(output_base_path, old_name) new_path = os.path.join(output_base_path, new_name) if os.path.exists(old_path) and not os.path.exists(new_path): os.rename(old_path, new_path) @staticmethod def delete_category(name: str): """Deletes a category and clears any staged tags associated with it.""" with SorterEngine.transaction() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM categories WHERE name = ?", (name,)) cursor.execute("DELETE FROM staging_area WHERE target_category = ?", (name,)) @staticmethod def sync_categories_from_disk(output_path: str) -> int: """Scans output directory and adds subfolders as DB categories.""" if not output_path or not os.path.exists(output_path): return 0 # Use scandir for better performance existing_folders = [] with os.scandir(output_path) as entries: for entry in entries: if entry.is_dir() and not entry.name.startswith("."): existing_folders.append(entry.name) with SorterEngine.transaction() as conn: cursor = conn.cursor() added = 0 # Batch insert for folder in existing_folders: cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (folder,)) if cursor.rowcount > 0: added += 1 return added # ========================================== # IMAGE OPERATIONS (OPTIMIZED) # ========================================== # Pre-compiled set for O(1) lookup _IMAGE_EXTENSIONS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff'}) @staticmethod def get_images(path: str, recursive: bool = False) -> List[str]: """Optimized image scanner using scandir.""" if not path or not os.path.exists(path): return [] image_list = [] if recursive: image_list = SorterEngine._scan_recursive(path) else: with os.scandir(path) as entries: for entry in entries: if entry.is_file(): ext = os.path.splitext(entry.name)[1].lower() if ext in SorterEngine._IMAGE_EXTENSIONS: image_list.append(entry.path) return sorted(image_list) @staticmethod def _scan_recursive(path: str) -> List[str]: """Helper for recursive scanning with scandir.""" results = [] try: with os.scandir(path) as entries: for entry in entries: if entry.is_dir(): if "_DELETED" not in entry.name: results.extend(SorterEngine._scan_recursive(entry.path)) elif entry.is_file(): ext = os.path.splitext(entry.name)[1].lower() if ext in SorterEngine._IMAGE_EXTENSIONS: results.append(entry.path) except PermissionError: pass return results @staticmethod def get_id_mapping(path: str) -> Dict[str, List[str]]: """Maps idXXX prefixes for collision handling.""" mapping = {} images = SorterEngine.get_images(path, recursive=False) for f in images: fname = os.path.basename(f) if fname.startswith("id") and "_" in fname: prefix = fname.split('_')[0] if prefix not in mapping: mapping[prefix] = [] mapping[prefix].append(fname) return mapping @staticmethod def get_max_id_number(target_path: str) -> int: max_id = 0 if not target_path or not os.path.exists(target_path): return 0 with os.scandir(target_path) as entries: for entry in entries: if entry.is_file() and entry.name.startswith("id") and "_" in entry.name: try: num = int(entry.name[2:].split('_')[0]) if num > max_id: max_id = num except (ValueError, IndexError): continue return max_id @staticmethod def get_folder_id(source_path: str) -> int: """Retrieves or generates a persistent ID for a specific folder.""" with SorterEngine.transaction() as conn: cursor = conn.cursor() cursor.execute("SELECT folder_id FROM folder_ids WHERE path = ?", (source_path,)) result = cursor.fetchone() if result: return result[0] else: cursor.execute("SELECT MAX(folder_id) FROM folder_ids") row = cursor.fetchone() fid = (row[0] + 1) if row and row[0] else 1 cursor.execute("INSERT INTO folder_ids VALUES (?, ?)", (source_path, fid)) return fid # ========================================== # IMAGE COMPRESSION (OPTIMIZED WITH CACHING) # ========================================== # Simple in-memory cache for thumbnails _thumbnail_cache: Dict[str, bytes] = {} _cache_max_items = 500 _cache_lock = threading.Lock() @classmethod def _get_cache_key(cls, path: str, quality: int, target_size: Optional[int]) -> str: """Generate cache key including file modification time.""" try: mtime = os.path.getmtime(path) except OSError: mtime = 0 return hashlib.md5(f"{path}:{quality}:{target_size}:{mtime}".encode()).hexdigest() @classmethod def compress_for_web(cls, path: str, quality: int, target_size: Optional[int] = None) -> Optional[bytes]: """Loads image, resizes smart, and saves as WebP with caching.""" cache_key = cls._get_cache_key(path, quality, target_size) # Check cache first with cls._cache_lock: if cache_key in cls._thumbnail_cache: return cls._thumbnail_cache[cache_key] try: with Image.open(path) as img: # Keep RGBA for WebP support, only convert unusual modes if img.mode not in ('RGB', 'RGBA', 'L', 'LA', 'P'): img = img.convert("RGBA" if img.mode.endswith('A') else "RGB") elif img.mode == 'P': img = img.convert("RGBA" if 'transparency' in img.info else "RGB") # Smart Resize if target_size and (img.width > target_size or img.height > target_size): img.thumbnail((target_size, target_size), Image.Resampling.LANCZOS) # Save as WebP buf = BytesIO() img.save(buf, format="WEBP", quality=quality, method=4) # method=4 is faster result = buf.getvalue() # Cache the result with cls._cache_lock: if len(cls._thumbnail_cache) >= cls._cache_max_items: # Simple eviction: remove oldest 20% keys_to_remove = list(cls._thumbnail_cache.keys())[:cls._cache_max_items // 5] for k in keys_to_remove: del cls._thumbnail_cache[k] cls._thumbnail_cache[cache_key] = result return result except Exception: return None @staticmethod def load_batch_parallel(image_paths: List[str], quality: int) -> Dict[str, bytes]: """Multithreaded loader: Compresses multiple images in parallel.""" results = {} def process_one(path): return path, SorterEngine.compress_for_web(path, quality) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: future_to_path = {executor.submit(process_one, p): p for p in image_paths} for future in concurrent.futures.as_completed(future_to_path): try: path, data = future.result() if data: results[path] = data except Exception: pass return results # ========================================== # STAGING OPERATIONS (OPTIMIZED) # ========================================== @staticmethod def delete_to_trash(file_path: str) -> Optional[str]: """Moves a file to a local _DELETED subfolder for undo support.""" if not os.path.exists(file_path): return None trash_dir = os.path.join(os.path.dirname(file_path), "_DELETED") os.makedirs(trash_dir, exist_ok=True) dest = os.path.join(trash_dir, os.path.basename(file_path)) shutil.move(file_path, dest) return dest @staticmethod def stage_image(original_path: str, category: str, new_name: str): """Records a pending rename/move in the database.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("INSERT OR REPLACE INTO staging_area VALUES (?, ?, ?, 1)", (original_path, category, new_name)) conn.commit() @staticmethod def clear_staged_item(original_path: str): """Removes an item from the pending staging area.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM staging_area WHERE original_path = ?", (original_path,)) conn.commit() @staticmethod def get_staged_data() -> Dict[str, Dict]: """Retrieves current tagged/staged images.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM staging_area") rows = cursor.fetchall() return {r[0]: {"cat": r[1], "name": r[2], "marked": r[3]} for r in rows} # ========================================== # BATCH COMMIT OPERATIONS (OPTIMIZED) # ========================================== @staticmethod def _compute_final_destination(output_root: str, name: str) -> str: """Compute final destination path with collision handling.""" final_dst = os.path.join(output_root, name) if not os.path.exists(final_dst): return final_dst root, ext = os.path.splitext(name) c = 1 while os.path.exists(final_dst): final_dst = os.path.join(output_root, f"{root}_{c}{ext}") c += 1 return final_dst @staticmethod def fix_permissions(path: str): """Forces file to be fully accessible (rwxrwxrwx).""" try: os.chmod(path, 0o777) except Exception: pass @staticmethod def commit_batch(file_list: List[str], output_root: str, cleanup_mode: str, operation: str = "Copy"): """Commits files with batched DB operations.""" data = SorterEngine.get_staged_data() if not os.path.exists(output_root): os.makedirs(output_root, exist_ok=True) # Prepare batch operations to_delete_from_staging = [] to_insert_to_log = [] for file_path in file_list: if not os.path.exists(file_path): continue # Tagged files if file_path in data and data[file_path]['marked']: info = data[file_path] final_dst = SorterEngine._compute_final_destination(output_root, info['name']) # Perform file operation if operation == "Copy": shutil.copy2(file_path, final_dst) else: shutil.move(file_path, final_dst) SorterEngine.fix_permissions(final_dst) to_delete_from_staging.append((file_path,)) to_insert_to_log.append((file_path, info['cat'], operation)) # Untagged files - cleanup elif cleanup_mode != "Keep": if cleanup_mode == "Move to Unused": unused_dir = os.path.join(os.path.dirname(file_path), "unused") os.makedirs(unused_dir, exist_ok=True) dest_unused = os.path.join(unused_dir, os.path.basename(file_path)) shutil.move(file_path, dest_unused) SorterEngine.fix_permissions(dest_unused) elif cleanup_mode == "Delete": os.remove(file_path) # Batch database updates with SorterEngine.transaction() as conn: cursor = conn.cursor() if to_delete_from_staging: cursor.executemany("DELETE FROM staging_area WHERE original_path = ?", to_delete_from_staging) if to_insert_to_log: cursor.executemany("INSERT OR REPLACE INTO processed_log VALUES (?, ?, ?)", to_insert_to_log) @staticmethod def commit_global(output_root: str, cleanup_mode: str, operation: str = "Copy", source_root: str = None): """Commits ALL staged files with batched operations.""" data = SorterEngine.get_staged_data() if not os.path.exists(output_root): os.makedirs(output_root, exist_ok=True) to_insert_to_log = [] # Process all staged items for old_p, info in data.items(): if not os.path.exists(old_p): continue final_dst = SorterEngine._compute_final_destination(output_root, info['name']) if operation == "Copy": shutil.copy2(old_p, final_dst) else: shutil.move(old_p, final_dst) SorterEngine.fix_permissions(final_dst) to_insert_to_log.append((old_p, info['cat'], operation)) # Global cleanup if cleanup_mode != "Keep" and source_root: all_imgs = SorterEngine.get_images(source_root, recursive=True) for img_p in all_imgs: if img_p not in data: if cleanup_mode == "Move to Unused": unused_dir = os.path.join(source_root, "unused") os.makedirs(unused_dir, exist_ok=True) dest_unused = os.path.join(unused_dir, os.path.basename(img_p)) shutil.move(img_p, dest_unused) SorterEngine.fix_permissions(dest_unused) elif cleanup_mode == "Delete": os.remove(img_p) # Batch database updates with SorterEngine.transaction() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM staging_area") if to_insert_to_log: cursor.executemany("INSERT OR REPLACE INTO processed_log VALUES (?, ?, ?)", to_insert_to_log) # ========================================== # UTILITY OPERATIONS # ========================================== @staticmethod def harmonize_names(t_p: str, c_p: str) -> str: """Forces the 'control' file to match the 'target' file's name.""" if not os.path.exists(t_p) or not os.path.exists(c_p): return c_p t_name = os.path.basename(t_p) t_root, t_ext = os.path.splitext(t_name) c_ext = os.path.splitext(c_p)[1] new_c_name = f"{t_root}{c_ext}" new_c_p = os.path.join(os.path.dirname(c_p), new_c_name) if os.path.exists(new_c_p) and c_p != new_c_p: new_c_p = os.path.join(os.path.dirname(c_p), f"{t_root}_alt{c_ext}") os.rename(c_p, new_c_p) return new_c_p @staticmethod def re_id_file(old_path: str, new_id_prefix: str) -> str: """Changes the idXXX_ prefix to resolve collisions.""" dir_name = os.path.dirname(old_path) old_name = os.path.basename(old_path) name_no_id = old_name.split('_', 1)[1] if '_' in old_name else old_name new_name = f"{new_id_prefix}{name_no_id}" new_path = os.path.join(dir_name, new_name) os.rename(old_path, new_path) return new_path @staticmethod def move_to_unused_synced(t_p: str, c_p: str, t_root: str, c_root: str) -> Tuple[str, str]: """Moves a pair to 'unused' subfolders.""" t_name = os.path.basename(t_p) t_un = os.path.join(t_root, "unused", t_name) c_un = os.path.join(c_root, "unused", t_name) os.makedirs(os.path.dirname(t_un), exist_ok=True) os.makedirs(os.path.dirname(c_un), exist_ok=True) shutil.move(t_p, t_un) shutil.move(c_p, c_un) return t_un, c_un @staticmethod def restore_from_unused(t_p: str, c_p: str, t_root: str, c_root: str) -> Tuple[str, str]: """Moves files back from 'unused' to main folders.""" t_name = os.path.basename(t_p) t_dst = os.path.join(t_root, "selected_target", t_name) c_dst = os.path.join(c_root, "selected_control", t_name) os.makedirs(os.path.dirname(t_dst), exist_ok=True) os.makedirs(os.path.dirname(c_dst), exist_ok=True) shutil.move(t_p, t_dst) shutil.move(c_p, c_dst) return t_dst, c_dst @staticmethod def revert_action(action: Dict): """Undoes move operations.""" if action['type'] == 'move' and os.path.exists(action['t_dst']): shutil.move(action['t_dst'], action['t_src']) elif action['type'] in ['unused', 'cat_move']: if os.path.exists(action['t_dst']): shutil.move(action['t_dst'], action['t_src']) if 'c_dst' in action and os.path.exists(action['c_dst']): shutil.move(action['c_dst'], action['c_src']) @staticmethod def get_processed_log() -> Dict[str, Dict]: """Retrieves history of processed files.""" with SorterEngine.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM processed_log") rows = cursor.fetchall() return {r[0]: {"cat": r[1], "action": r[2]} for r in rows} @staticmethod def get_tagged_page_indices(all_images: List[str], page_size: int) -> set: """Optimized: Uses set lookup instead of linear search.""" staged = SorterEngine.get_staged_data() if not staged: return set() # Build reverse index for O(1) lookups path_to_idx = {path: idx for idx, path in enumerate(all_images)} tagged_pages = set() for path in staged.keys(): if path in path_to_idx: tagged_pages.add(path_to_idx[path] // page_size) return tagged_pages