diff --git a/engine_optimized.py b/engine_optimized.py deleted file mode 100644 index 456985d..0000000 --- a/engine_optimized.py +++ /dev/null @@ -1,772 +0,0 @@ -import os -import shutil -import sqlite3 -import threading -import hashlib -from contextlib import contextmanager -from functools import lru_cache -from PIL import Image -from io import BytesIO -from typing import Dict, List, Optional, Tuple -import concurrent.futures - - -class SorterEngine: - DB_PATH = "/app/sorter_database.db" - _local = threading.local() - - # ========================================== - # DATABASE CONNECTION MANAGEMENT (OPTIMIZED) - # ========================================== - - @classmethod - @contextmanager - def get_connection(cls): - """Thread-safe connection context manager.""" - conn = sqlite3.connect(cls.DB_PATH, check_same_thread=False) - conn.row_factory = sqlite3.Row - try: - yield conn - finally: - conn.close() - - @classmethod - @contextmanager - def transaction(cls): - """Execute multiple operations in a single transaction.""" - with cls.get_connection() as conn: - try: - yield conn - conn.commit() - except Exception: - conn.rollback() - raise - - # ========================================== - # DATABASE INITIALIZATION - # ========================================== - - @staticmethod - def init_db(): - """Initializes tables, including the HISTORY log and persistent tags.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - - cursor.execute('''CREATE TABLE IF NOT EXISTS profiles - (name TEXT PRIMARY KEY, tab1_target TEXT, tab2_target TEXT, tab2_control TEXT, - tab4_source TEXT, tab4_out TEXT, mode TEXT, tab5_source TEXT, tab5_out TEXT)''') - cursor.execute('''CREATE TABLE IF NOT EXISTS folder_ids (path TEXT PRIMARY KEY, folder_id INTEGER)''') - cursor.execute('''CREATE TABLE IF NOT EXISTS categories (name TEXT PRIMARY KEY)''') - cursor.execute('''CREATE TABLE IF NOT EXISTS staging_area - (original_path TEXT PRIMARY KEY, target_category TEXT, new_name TEXT, is_marked INTEGER DEFAULT 0)''') - cursor.execute('''CREATE TABLE IF NOT EXISTS processed_log - (source_path TEXT PRIMARY KEY, category TEXT, action_type TEXT)''') - - # NEW: Persistent tags table - survives after commit - # Maps: output_folder + filename -> original source info for tag restoration - cursor.execute('''CREATE TABLE IF NOT EXISTS persistent_tags - (output_folder TEXT, - filename TEXT, - category TEXT, - tag_index INTEGER, - original_source_folder TEXT, - committed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (output_folder, filename))''') - - # Index for fast lookup by output folder - cursor.execute('''CREATE INDEX IF NOT EXISTS idx_persistent_tags_folder - ON persistent_tags(output_folder)''') - - # Seed categories if empty - cursor.execute("SELECT COUNT(*) FROM categories") - if cursor.fetchone()[0] == 0: - cursor.executemany( - "INSERT OR IGNORE INTO categories VALUES (?)", - [("_TRASH",), ("Default",), ("Action",), ("Solo",)] - ) - - conn.commit() - - # ========================================== - # PROFILE & PATH MANAGEMENT (OPTIMIZED) - # ========================================== - - @staticmethod - def save_tab_paths(profile_name, t1_t=None, t2_t=None, t2_c=None, t4_s=None, t4_o=None, mode=None, t5_s=None, t5_o=None): - """Updates specific tab paths in the database while preserving others.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM profiles WHERE name = ?", (profile_name,)) - row = cursor.fetchone() - - if not row: - row = (profile_name, "/storage", "/storage", "/storage", "/storage", "/storage", "id", "/storage", "/storage") - - new_values = ( - profile_name, - t1_t if t1_t is not None else row[1], - t2_t if t2_t is not None else row[2], - t2_c if t2_c is not None else row[3], - t4_s if t4_s is not None else row[4], - t4_o if t4_o is not None else row[5], - mode if mode is not None else row[6], - t5_s if t5_s is not None else row[7], - t5_o if t5_o is not None else row[8] - ) - cursor.execute("INSERT OR REPLACE INTO profiles VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", new_values) - conn.commit() - - @staticmethod - def load_profiles(): - """Loads all workspace presets.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM profiles") - rows = cursor.fetchall() - return {r[0]: { - "tab1_target": r[1], "tab2_target": r[2], "tab2_control": r[3], - "tab4_source": r[4], "tab4_out": r[5], "mode": r[6], - "tab5_source": r[7], "tab5_out": r[8] - } for r in rows} - - # ========================================== - # CATEGORY MANAGEMENT (OPTIMIZED) - # ========================================== - - @staticmethod - def get_categories() -> List[str]: - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT name FROM categories ORDER BY name COLLATE NOCASE ASC") - return [r[0] for r in cursor.fetchall()] - - @staticmethod - def add_category(name: str): - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (name,)) - conn.commit() - - @staticmethod - def rename_category(old_name: str, new_name: str, output_base_path: str = None): - """Renames category in DB and optionally renames the physical folder.""" - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - cursor.execute("UPDATE categories SET name = ? WHERE name = ?", (new_name, old_name)) - cursor.execute("UPDATE staging_area SET target_category = ? WHERE target_category = ?", (new_name, old_name)) - - if output_base_path: - old_path = os.path.join(output_base_path, old_name) - new_path = os.path.join(output_base_path, new_name) - if os.path.exists(old_path) and not os.path.exists(new_path): - os.rename(old_path, new_path) - - @staticmethod - def delete_category(name: str): - """Deletes a category and clears any staged tags associated with it.""" - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM categories WHERE name = ?", (name,)) - cursor.execute("DELETE FROM staging_area WHERE target_category = ?", (name,)) - - @staticmethod - def sync_categories_from_disk(output_path: str) -> int: - """Scans output directory and adds subfolders as DB categories.""" - if not output_path or not os.path.exists(output_path): - return 0 - - # Use scandir for better performance - existing_folders = [] - with os.scandir(output_path) as entries: - for entry in entries: - if entry.is_dir() and not entry.name.startswith("."): - existing_folders.append(entry.name) - - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - added = 0 - # Batch insert - for folder in existing_folders: - cursor.execute("INSERT OR IGNORE INTO categories VALUES (?)", (folder,)) - if cursor.rowcount > 0: - added += 1 - return added - - # ========================================== - # IMAGE OPERATIONS (OPTIMIZED) - # ========================================== - - # Pre-compiled set for O(1) lookup - _IMAGE_EXTENSIONS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff'}) - - @staticmethod - def get_images(path: str, recursive: bool = False) -> List[str]: - """Optimized image scanner using scandir.""" - if not path or not os.path.exists(path): - return [] - - image_list = [] - - if recursive: - image_list = SorterEngine._scan_recursive(path) - else: - with os.scandir(path) as entries: - for entry in entries: - if entry.is_file(): - ext = os.path.splitext(entry.name)[1].lower() - if ext in SorterEngine._IMAGE_EXTENSIONS: - image_list.append(entry.path) - - return sorted(image_list) - - @staticmethod - def _scan_recursive(path: str) -> List[str]: - """Helper for recursive scanning with scandir.""" - results = [] - try: - with os.scandir(path) as entries: - for entry in entries: - if entry.is_dir(): - if "_DELETED" not in entry.name: - results.extend(SorterEngine._scan_recursive(entry.path)) - elif entry.is_file(): - ext = os.path.splitext(entry.name)[1].lower() - if ext in SorterEngine._IMAGE_EXTENSIONS: - results.append(entry.path) - except PermissionError: - pass - return results - - @staticmethod - def get_id_mapping(path: str) -> Dict[str, List[str]]: - """Maps idXXX prefixes for collision handling.""" - mapping = {} - images = SorterEngine.get_images(path, recursive=False) - for f in images: - fname = os.path.basename(f) - if fname.startswith("id") and "_" in fname: - prefix = fname.split('_')[0] - if prefix not in mapping: - mapping[prefix] = [] - mapping[prefix].append(fname) - return mapping - - @staticmethod - def get_max_id_number(target_path: str) -> int: - max_id = 0 - if not target_path or not os.path.exists(target_path): - return 0 - - with os.scandir(target_path) as entries: - for entry in entries: - if entry.is_file() and entry.name.startswith("id") and "_" in entry.name: - try: - num = int(entry.name[2:].split('_')[0]) - if num > max_id: - max_id = num - except (ValueError, IndexError): - continue - return max_id - - @staticmethod - def get_folder_id(source_path: str) -> int: - """Retrieves or generates a persistent ID for a specific folder.""" - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - cursor.execute("SELECT folder_id FROM folder_ids WHERE path = ?", (source_path,)) - result = cursor.fetchone() - if result: - return result[0] - else: - cursor.execute("SELECT MAX(folder_id) FROM folder_ids") - row = cursor.fetchone() - fid = (row[0] + 1) if row and row[0] else 1 - cursor.execute("INSERT INTO folder_ids VALUES (?, ?)", (source_path, fid)) - return fid - - # ========================================== - # IMAGE COMPRESSION (OPTIMIZED WITH CACHING) - # ========================================== - - # Simple in-memory cache for thumbnails - _thumbnail_cache: Dict[str, bytes] = {} - _cache_max_items = 500 - _cache_lock = threading.Lock() - - @classmethod - def _get_cache_key(cls, path: str, quality: int, target_size: Optional[int]) -> str: - """Generate cache key including file modification time.""" - try: - mtime = os.path.getmtime(path) - except OSError: - mtime = 0 - return hashlib.md5(f"{path}:{quality}:{target_size}:{mtime}".encode()).hexdigest() - - @classmethod - def compress_for_web(cls, path: str, quality: int, target_size: Optional[int] = None) -> Optional[bytes]: - """Loads image, resizes smart, and saves as WebP with caching.""" - cache_key = cls._get_cache_key(path, quality, target_size) - - # Check cache first - with cls._cache_lock: - if cache_key in cls._thumbnail_cache: - return cls._thumbnail_cache[cache_key] - - try: - with Image.open(path) as img: - # Keep RGBA for WebP support, only convert unusual modes - if img.mode not in ('RGB', 'RGBA', 'L', 'LA', 'P'): - img = img.convert("RGBA" if img.mode.endswith('A') else "RGB") - elif img.mode == 'P': - img = img.convert("RGBA" if 'transparency' in img.info else "RGB") - - # Smart Resize - if target_size and (img.width > target_size or img.height > target_size): - img.thumbnail((target_size, target_size), Image.Resampling.LANCZOS) - - # Save as WebP - buf = BytesIO() - img.save(buf, format="WEBP", quality=quality, method=4) # method=4 is faster - result = buf.getvalue() - - # Cache the result - with cls._cache_lock: - if len(cls._thumbnail_cache) >= cls._cache_max_items: - # Simple eviction: remove oldest 20% - keys_to_remove = list(cls._thumbnail_cache.keys())[:cls._cache_max_items // 5] - for k in keys_to_remove: - del cls._thumbnail_cache[k] - cls._thumbnail_cache[cache_key] = result - - return result - except Exception: - return None - - @staticmethod - def load_batch_parallel(image_paths: List[str], quality: int) -> Dict[str, bytes]: - """Multithreaded loader: Compresses multiple images in parallel.""" - results = {} - - def process_one(path): - return path, SorterEngine.compress_for_web(path, quality) - - with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - future_to_path = {executor.submit(process_one, p): p for p in image_paths} - for future in concurrent.futures.as_completed(future_to_path): - try: - path, data = future.result() - if data: - results[path] = data - except Exception: - pass - - return results - - # ========================================== - # STAGING OPERATIONS (OPTIMIZED) - # ========================================== - - @staticmethod - def delete_to_trash(file_path: str) -> Optional[str]: - """Moves a file to a local _DELETED subfolder for undo support.""" - if not os.path.exists(file_path): - return None - trash_dir = os.path.join(os.path.dirname(file_path), "_DELETED") - os.makedirs(trash_dir, exist_ok=True) - dest = os.path.join(trash_dir, os.path.basename(file_path)) - shutil.move(file_path, dest) - return dest - - @staticmethod - def stage_image(original_path: str, category: str, new_name: str): - """Records a pending rename/move in the database.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("INSERT OR REPLACE INTO staging_area VALUES (?, ?, ?, 1)", - (original_path, category, new_name)) - conn.commit() - - @staticmethod - def clear_staged_item(original_path: str): - """Removes an item from the pending staging area.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM staging_area WHERE original_path = ?", (original_path,)) - conn.commit() - - @staticmethod - def get_staged_data() -> Dict[str, Dict]: - """Retrieves current tagged/staged images.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM staging_area") - rows = cursor.fetchall() - return {r[0]: {"cat": r[1], "name": r[2], "marked": r[3]} for r in rows} - - # ========================================== - # PERSISTENT TAGS (NEW) - # ========================================== - - @staticmethod - def save_persistent_tag(output_folder: str, filename: str, category: str, - tag_index: int, source_folder: str): - """Save a tag permanently after commit.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - INSERT OR REPLACE INTO persistent_tags - (output_folder, filename, category, tag_index, original_source_folder) - VALUES (?, ?, ?, ?, ?) - """, (output_folder, filename, category, tag_index, source_folder)) - conn.commit() - - @staticmethod - def save_persistent_tags_batch(tags: List[Tuple[str, str, str, int, str]]): - """Batch save multiple persistent tags.""" - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - cursor.executemany(""" - INSERT OR REPLACE INTO persistent_tags - (output_folder, filename, category, tag_index, original_source_folder) - VALUES (?, ?, ?, ?, ?) - """, tags) - - @staticmethod - def get_persistent_tags(output_folder: str) -> Dict[str, Dict]: - """ - Get all persistent tags for an output folder. - Returns: {filename: {"cat": category, "index": tag_index, "source": source_folder}} - """ - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - # Ensure table exists (safe for first run) - cursor.execute('''CREATE TABLE IF NOT EXISTS persistent_tags - (output_folder TEXT, - filename TEXT, - category TEXT, - tag_index INTEGER, - original_source_folder TEXT, - committed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (output_folder, filename))''') - cursor.execute(""" - SELECT filename, category, tag_index, original_source_folder - FROM persistent_tags - WHERE output_folder = ? - """, (output_folder,)) - rows = cursor.fetchall() - return { - r[0]: {"cat": r[1], "index": r[2], "source": r[3]} - for r in rows - } - - @staticmethod - def get_persistent_tags_by_category(output_folder: str, category: str) -> Dict[int, str]: - """ - Get persistent tags for a specific category in an output folder. - Returns: {tag_index: filename} - """ - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - # Ensure table exists (safe for first run) - cursor.execute('''CREATE TABLE IF NOT EXISTS persistent_tags - (output_folder TEXT, - filename TEXT, - category TEXT, - tag_index INTEGER, - original_source_folder TEXT, - committed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (output_folder, filename))''') - cursor.execute(""" - SELECT tag_index, filename - FROM persistent_tags - WHERE output_folder = ? AND category = ? - """, (output_folder, category)) - rows = cursor.fetchall() - return {r[0]: r[1] for r in rows} - - @staticmethod - def delete_persistent_tag(output_folder: str, filename: str): - """Remove a persistent tag (e.g., if file is deleted).""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute(""" - DELETE FROM persistent_tags - WHERE output_folder = ? AND filename = ? - """, (output_folder, filename)) - conn.commit() - - @staticmethod - def clear_persistent_tags(output_folder: str): - """Clear all persistent tags for an output folder.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM persistent_tags WHERE output_folder = ?", (output_folder,)) - conn.commit() - - # ========================================== - # BATCH COMMIT OPERATIONS (OPTIMIZED) - # ========================================== - - @staticmethod - def _compute_final_destination(output_root: str, name: str) -> str: - """Compute final destination path with collision handling.""" - final_dst = os.path.join(output_root, name) - if not os.path.exists(final_dst): - return final_dst - - root, ext = os.path.splitext(name) - c = 1 - while os.path.exists(final_dst): - final_dst = os.path.join(output_root, f"{root}_{c}{ext}") - c += 1 - return final_dst - - @staticmethod - def fix_permissions(path: str): - """Forces file to be fully accessible (rwxrwxrwx).""" - try: - os.chmod(path, 0o777) - except Exception: - pass - - @staticmethod - def commit_batch(file_list: List[str], output_root: str, cleanup_mode: str, operation: str = "Copy"): - """Commits files with batched DB operations and saves persistent tags.""" - data = SorterEngine.get_staged_data() - - if not os.path.exists(output_root): - os.makedirs(output_root, exist_ok=True) - - # Prepare batch operations - to_delete_from_staging = [] - to_insert_to_log = [] - persistent_tags_to_save = [] - - for file_path in file_list: - if not os.path.exists(file_path): - continue - - # Tagged files - if file_path in data and data[file_path]['marked']: - info = data[file_path] - final_dst = SorterEngine._compute_final_destination(output_root, info['name']) - final_filename = os.path.basename(final_dst) - - # Extract tag index from filename - tag_index = None - try: - tag_index = int(info['name'].rsplit('_', 1)[1].split('.')[0]) - except (ValueError, IndexError): - pass - - # Perform file operation - if operation == "Copy": - shutil.copy2(file_path, final_dst) - else: - shutil.move(file_path, final_dst) - - SorterEngine.fix_permissions(final_dst) - - to_delete_from_staging.append((file_path,)) - to_insert_to_log.append((file_path, info['cat'], operation)) - - # Save persistent tag - if tag_index is not None: - persistent_tags_to_save.append(( - output_root, # output_folder - final_filename, # filename - info['cat'], # category - tag_index, # tag_index - os.path.dirname(file_path) # original source folder - )) - - # Untagged files - cleanup - elif cleanup_mode != "Keep": - if cleanup_mode == "Move to Unused": - unused_dir = os.path.join(os.path.dirname(file_path), "unused") - os.makedirs(unused_dir, exist_ok=True) - dest_unused = os.path.join(unused_dir, os.path.basename(file_path)) - shutil.move(file_path, dest_unused) - SorterEngine.fix_permissions(dest_unused) - elif cleanup_mode == "Delete": - os.remove(file_path) - - # Batch database updates - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - if to_delete_from_staging: - cursor.executemany("DELETE FROM staging_area WHERE original_path = ?", to_delete_from_staging) - if to_insert_to_log: - cursor.executemany("INSERT OR REPLACE INTO processed_log VALUES (?, ?, ?)", to_insert_to_log) - if persistent_tags_to_save: - cursor.executemany(""" - INSERT OR REPLACE INTO persistent_tags - (output_folder, filename, category, tag_index, original_source_folder) - VALUES (?, ?, ?, ?, ?) - """, persistent_tags_to_save) - - @staticmethod - def commit_global(output_root: str, cleanup_mode: str, operation: str = "Copy", source_root: str = None): - """Commits ALL staged files with batched operations and saves persistent tags.""" - data = SorterEngine.get_staged_data() - - if not os.path.exists(output_root): - os.makedirs(output_root, exist_ok=True) - - to_insert_to_log = [] - persistent_tags_to_save = [] - - # Process all staged items - for old_p, info in data.items(): - if not os.path.exists(old_p): - continue - - final_dst = SorterEngine._compute_final_destination(output_root, info['name']) - final_filename = os.path.basename(final_dst) - - # Extract tag index from filename - tag_index = None - try: - tag_index = int(info['name'].rsplit('_', 1)[1].split('.')[0]) - except (ValueError, IndexError): - pass - - if operation == "Copy": - shutil.copy2(old_p, final_dst) - else: - shutil.move(old_p, final_dst) - - SorterEngine.fix_permissions(final_dst) - to_insert_to_log.append((old_p, info['cat'], operation)) - - # Save persistent tag - if tag_index is not None: - persistent_tags_to_save.append(( - output_root, # output_folder - final_filename, # filename - info['cat'], # category - tag_index, # tag_index - os.path.dirname(old_p) # original source folder - )) - - # Global cleanup - if cleanup_mode != "Keep" and source_root: - all_imgs = SorterEngine.get_images(source_root, recursive=True) - for img_p in all_imgs: - if img_p not in data: - if cleanup_mode == "Move to Unused": - unused_dir = os.path.join(source_root, "unused") - os.makedirs(unused_dir, exist_ok=True) - dest_unused = os.path.join(unused_dir, os.path.basename(img_p)) - shutil.move(img_p, dest_unused) - SorterEngine.fix_permissions(dest_unused) - elif cleanup_mode == "Delete": - os.remove(img_p) - - # Batch database updates - with SorterEngine.transaction() as conn: - cursor = conn.cursor() - cursor.execute("DELETE FROM staging_area") - if to_insert_to_log: - cursor.executemany("INSERT OR REPLACE INTO processed_log VALUES (?, ?, ?)", to_insert_to_log) - if persistent_tags_to_save: - cursor.executemany(""" - INSERT OR REPLACE INTO persistent_tags - (output_folder, filename, category, tag_index, original_source_folder) - VALUES (?, ?, ?, ?, ?) - """, persistent_tags_to_save) - - # ========================================== - # UTILITY OPERATIONS - # ========================================== - - @staticmethod - def harmonize_names(t_p: str, c_p: str) -> str: - """Forces the 'control' file to match the 'target' file's name.""" - if not os.path.exists(t_p) or not os.path.exists(c_p): - return c_p - - t_name = os.path.basename(t_p) - t_root, t_ext = os.path.splitext(t_name) - c_ext = os.path.splitext(c_p)[1] - - new_c_name = f"{t_root}{c_ext}" - new_c_p = os.path.join(os.path.dirname(c_p), new_c_name) - - if os.path.exists(new_c_p) and c_p != new_c_p: - new_c_p = os.path.join(os.path.dirname(c_p), f"{t_root}_alt{c_ext}") - - os.rename(c_p, new_c_p) - return new_c_p - - @staticmethod - def re_id_file(old_path: str, new_id_prefix: str) -> str: - """Changes the idXXX_ prefix to resolve collisions.""" - dir_name = os.path.dirname(old_path) - old_name = os.path.basename(old_path) - name_no_id = old_name.split('_', 1)[1] if '_' in old_name else old_name - new_name = f"{new_id_prefix}{name_no_id}" - new_path = os.path.join(dir_name, new_name) - os.rename(old_path, new_path) - return new_path - - @staticmethod - def move_to_unused_synced(t_p: str, c_p: str, t_root: str, c_root: str) -> Tuple[str, str]: - """Moves a pair to 'unused' subfolders.""" - t_name = os.path.basename(t_p) - t_un = os.path.join(t_root, "unused", t_name) - c_un = os.path.join(c_root, "unused", t_name) - os.makedirs(os.path.dirname(t_un), exist_ok=True) - os.makedirs(os.path.dirname(c_un), exist_ok=True) - shutil.move(t_p, t_un) - shutil.move(c_p, c_un) - return t_un, c_un - - @staticmethod - def restore_from_unused(t_p: str, c_p: str, t_root: str, c_root: str) -> Tuple[str, str]: - """Moves files back from 'unused' to main folders.""" - t_name = os.path.basename(t_p) - t_dst = os.path.join(t_root, "selected_target", t_name) - c_dst = os.path.join(c_root, "selected_control", t_name) - os.makedirs(os.path.dirname(t_dst), exist_ok=True) - os.makedirs(os.path.dirname(c_dst), exist_ok=True) - shutil.move(t_p, t_dst) - shutil.move(c_p, c_dst) - return t_dst, c_dst - - @staticmethod - def revert_action(action: Dict): - """Undoes move operations.""" - if action['type'] == 'move' and os.path.exists(action['t_dst']): - shutil.move(action['t_dst'], action['t_src']) - elif action['type'] in ['unused', 'cat_move']: - if os.path.exists(action['t_dst']): - shutil.move(action['t_dst'], action['t_src']) - if 'c_dst' in action and os.path.exists(action['c_dst']): - shutil.move(action['c_dst'], action['c_src']) - - @staticmethod - def get_processed_log() -> Dict[str, Dict]: - """Retrieves history of processed files.""" - with SorterEngine.get_connection() as conn: - cursor = conn.cursor() - cursor.execute("SELECT * FROM processed_log") - rows = cursor.fetchall() - return {r[0]: {"cat": r[1], "action": r[2]} for r in rows} - - @staticmethod - def get_tagged_page_indices(all_images: List[str], page_size: int) -> set: - """Optimized: Uses set lookup instead of linear search.""" - staged = SorterEngine.get_staged_data() - if not staged: - return set() - - # Build reverse index for O(1) lookups - path_to_idx = {path: idx for idx, path in enumerate(all_images)} - - tagged_pages = set() - for path in staged.keys(): - if path in path_to_idx: - tagged_pages.add(path_to_idx[path] // page_size) - - return tagged_pages \ No newline at end of file