import os
import math
import asyncio
from typing import Optional, List, Dict, Set
from nicegui import ui, app, run
from fastapi import Response
from engine import SorterEngine
# ==========================================
# CUSTOM CSS FOR REFINED AESTHETICS
# ==========================================
CUSTOM_CSS = """
"""
# ==========================================
# STATE MANAGEMENT
# ==========================================
class AppState:
"""Centralized application state with lazy loading and caching."""
__slots__ = [
'profiles', 'profile_name', 'source_dir', 'output_dir',
'page', 'page_size', 'grid_cols', 'preview_quality',
'active_cat', 'next_index', 'batch_mode', 'cleanup_mode',
'all_images', 'staged_data', 'green_dots', 'index_map',
'sidebar_container', 'grid_container', 'pagination_container',
'stats_container', '_image_cache'
]
def __init__(self):
# Profile Data
self.profiles = SorterEngine.load_profiles()
self.profile_name = "Default"
if not self.profiles:
self.profiles = {"Default": {"tab5_source": "/storage", "tab5_out": "/storage"}}
self.load_active_profile()
# View Settings
self.page = 0
self.page_size = 24
self.grid_cols = 4
self.preview_quality = 50
# Tagging State
self.active_cat = "Default"
self.next_index = 1
# Batch Settings
self.batch_mode = "Copy"
self.cleanup_mode = "Keep"
# Data Caches (optimized with sets for O(1) lookup)
self.all_images: List[str] = []
self.staged_data: Dict = {}
self.green_dots: Set[int] = set()
self.index_map: Dict[int, str] = {}
self._image_cache: Set[str] = set() # Fast existence check
# UI Containers
self.sidebar_container = None
self.grid_container = None
self.pagination_container = None
self.stats_container = None
def load_active_profile(self):
"""Load paths from active profile."""
p_data = self.profiles.get(self.profile_name, {})
self.source_dir = p_data.get("tab5_source", "/storage")
self.output_dir = p_data.get("tab5_out", "/storage")
def save_current_profile(self):
"""Save current paths to active profile."""
if self.profile_name not in self.profiles:
self.profiles[self.profile_name] = {}
self.profiles[self.profile_name]["tab5_source"] = self.source_dir
self.profiles[self.profile_name]["tab5_out"] = self.output_dir
SorterEngine.save_tab_paths(self.profile_name, t5_s=self.source_dir, t5_o=self.output_dir)
ui.notify(f"Profile '{self.profile_name}' saved!", type='positive')
def get_categories(self) -> List[str]:
"""Get list of categories, ensuring active_cat exists."""
cats = SorterEngine.get_categories() or ["Default"]
if self.active_cat not in cats:
self.active_cat = cats[0]
return cats
@property
def total_pages(self) -> int:
"""Calculate total pages."""
if not self.all_images:
return 0
return (len(self.all_images) + self.page_size - 1) // self.page_size
def get_current_batch(self) -> List[str]:
"""Get images for current page with bounds checking."""
if not self.all_images:
return []
start = self.page * self.page_size
end = min(start + self.page_size, len(self.all_images))
return self.all_images[start:end]
@property
def tagged_count(self) -> int:
"""Count of currently tagged images."""
return len(self.staged_data)
@property
def total_count(self) -> int:
"""Total images loaded."""
return len(self.all_images)
state = AppState()
# ==========================================
# IMAGE SERVING API (Optimized)
# ==========================================
@app.get('/thumbnail')
async def get_thumbnail(path: str, size: int = 400, q: int = 50):
"""Serve WebP thumbnail with streaming response."""
if not os.path.exists(path):
return Response(status_code=404)
img_bytes = await run.cpu_bound(SorterEngine.compress_for_web, path, q, size)
if img_bytes:
return Response(
content=img_bytes,
media_type="image/webp",
headers={"Cache-Control": "max-age=3600"} # Browser caching
)
return Response(status_code=500)
@app.get('/full_res')
async def get_full_res(path: str):
"""Serve full resolution image with caching headers."""
if not os.path.exists(path):
return Response(status_code=404)
img_bytes = await run.cpu_bound(SorterEngine.compress_for_web, path, 90, None)
if img_bytes:
return Response(
content=img_bytes,
media_type="image/webp",
headers={"Cache-Control": "max-age=7200"}
)
return Response(status_code=500)
# ==========================================
# CORE LOGIC
# ==========================================
def load_images():
"""Load images from source directory with tag restoration."""
if not os.path.exists(state.source_dir):
ui.notify(f"Source not found: {state.source_dir}", type='warning')
return
# Load images
state.all_images = SorterEngine.get_images(state.source_dir, recursive=True)
state._image_cache = set(state.all_images) # Fast lookup cache
# --- RESTORE SAVED TAGS FOR THIS FOLDER ---
restored = SorterEngine.restore_folder_tags(state.source_dir)
if restored > 0:
ui.notify(f"Restored {restored} saved tags for this folder", type='positive')
# Reset page if out of bounds
if state.page >= state.total_pages:
state.page = 0
refresh_staged_info()
refresh_ui()
def refresh_staged_info():
"""Update staged data and index maps with optimized lookups."""
state.staged_data = SorterEngine.get_staged_data()
# Update green dots using set operations
state.green_dots.clear()
staged_keys = set(state.staged_data.keys())
# Batch process for efficiency
for idx, img_path in enumerate(state.all_images):
if img_path in staged_keys:
state.green_dots.add(idx // state.page_size)
# Build index map for active category
state.index_map.clear()
# Add staged images
for orig_path, info in state.staged_data.items():
if info['cat'] == state.active_cat:
idx = _extract_index(info['name'])
if idx is not None:
state.index_map[idx] = orig_path
# Add committed images from disk
cat_path = os.path.join(state.output_dir, state.active_cat)
if os.path.exists(cat_path):
try:
with os.scandir(cat_path) as entries:
for entry in entries:
if entry.is_file() and entry.name.startswith(state.active_cat):
idx = _extract_index(entry.name)
if idx is not None and idx not in state.index_map:
state.index_map[idx] = entry.path
except PermissionError:
pass
def _extract_index(filename: str) -> Optional[int]:
"""Extract numeric index from filename with error handling."""
try:
return int(filename.rsplit('_', 1)[1].split('.')[0])
except (ValueError, IndexError):
return None
# ==========================================
# ACTIONS
# ==========================================
def action_tag(img_path: str, manual_idx: Optional[int] = None):
"""Tag an image with category and index, with folder persistence."""
idx = manual_idx if manual_idx is not None else state.next_index
ext = os.path.splitext(img_path)[1]
name = f"{state.active_cat}_{idx:03d}{ext}"
# Check for conflicts
final_path = os.path.join(state.output_dir, state.active_cat, name)
staged_names = {v['name'] for v in state.staged_data.values() if v['cat'] == state.active_cat}
if name in staged_names or os.path.exists(final_path):
ui.notify(f"Conflict: {name} exists. Using suffix.", type='warning')
name = f"{state.active_cat}_{idx:03d}_{len(staged_names)+1}{ext}"
# Stage with folder persistence
SorterEngine.stage_image(img_path, state.active_cat, name, source_root=state.source_dir)
if manual_idx is None:
state.next_index = idx + 1
refresh_staged_info()
refresh_ui()
def action_untag(img_path: str):
"""Remove staging from an image, including folder cache."""
SorterEngine.clear_staged_item(img_path, source_root=state.source_dir)
refresh_staged_info()
refresh_ui()
def action_delete(img_path: str):
"""Delete image to trash."""
SorterEngine.delete_to_trash(img_path)
load_images()
def action_apply_page():
"""Apply staged changes for current page only."""
batch = state.get_current_batch()
if not batch:
ui.notify("No images on current page", type='warning')
return
SorterEngine.commit_batch(batch, state.output_dir, state.cleanup_mode, state.batch_mode)
ui.notify(f"Page processed ({state.batch_mode})", type='positive')
load_images()
async def action_apply_global():
"""Apply all staged changes globally and clear folder cache."""
ui.notify("Starting global apply... This may take a while.", type='info')
await run.io_bound(
SorterEngine.commit_global,
state.output_dir,
state.cleanup_mode,
state.batch_mode,
state.source_dir
)
load_images()
ui.notify("Global apply complete!", type='positive')
# ==========================================
# UI COMPONENTS
# ==========================================
def open_zoom_dialog(path: str, title: Optional[str] = None, show_untag: bool = False, show_jump: bool = False):
"""Open full-resolution image dialog with refined styling."""
with ui.dialog() as dialog, ui.card().classes('max-w-screen-xl p-0 gap-0 zoom-dialog'):
with ui.row().classes('w-full justify-between items-center px-4 py-3').style('background: var(--bg-elevated)'):
ui.label(title or os.path.basename(path)).classes('font-semibold text-white truncate').style('font-family: "JetBrains Mono", monospace')
with ui.row().classes('gap-2'):
if show_jump and path in state._image_cache:
def jump_to_image():
img_idx = state.all_images.index(path)
target_page = img_idx // state.page_size
dialog.close()
set_page(target_page)
ui.notify(f"Jumped to page {target_page + 1}", type='info')
ui.button(icon='location_searching', on_click=jump_to_image) \
.props('flat round dense').classes('text-blue-400') \
.tooltip('Jump to image location')
if show_untag:
def untag_and_close():
action_untag(path)
dialog.close()
ui.notify("Tag removed", type='positive')
ui.button(icon='label_off', on_click=untag_and_close) \
.props('flat round dense').classes('text-red-400') \
.tooltip('Remove tag')
ui.button(icon='close', on_click=dialog.close).props('flat round dense color=white')
ui.image(f"/full_res?path={path}").classes('w-full h-auto object-contain').style('max-height: 85vh; background: var(--bg-deep)')
dialog.open()
def render_sidebar():
"""Render category management sidebar with refined design."""
state.sidebar_container.clear()
with state.sidebar_container:
# Header
with ui.row().classes('items-center gap-3 mb-6'):
ui.icon('sell', size='28px').classes('text-green-400')
ui.label("Tag Manager").classes('text-xl font-semibold text-white')
# Category selector with visual enhancement
categories = state.get_categories()
def on_category_change(e):
state.active_cat = e.value
refresh_staged_info()
render_sidebar()
ui.select(
categories,
value=state.active_cat,
label="Active Category",
on_change=on_category_change
).classes('w-full mb-4').props('dark outlined color=green')
# Number grid (1-25) with refined styling
ui.label("Quick Index").classes('text-xs font-semibold text-gray-400 uppercase tracking-wider mb-2')
with ui.grid(columns=5).classes('gap-1 mb-6 w-full'):
for i in range(1, 26):
is_used = i in state.index_map
def make_click_handler(num: int):
def handler():
if num in state.index_map:
img_path = state.index_map[num]
is_staged = img_path in state.staged_data
open_zoom_dialog(
img_path,
f"{state.active_cat} #{num}",
show_untag=is_staged,
show_jump=True
)
else:
state.next_index = num
render_sidebar()
return handler
btn_classes = 'w-full num-btn'
if is_used:
btn_classes += ' used'
ui.button(str(i), on_click=make_click_handler(i)) \
.props(f'flat size=sm {"color=green" if is_used else "color=grey-8"}') \
.classes(btn_classes)
# Next index input
with ui.row().classes('w-full items-end no-wrap gap-2 mb-4'):
ui.number(label="Next Index", min=1, precision=0) \
.bind_value(state, 'next_index') \
.classes('flex-grow').props('dark outlined color=green')
def reset_index():
state.next_index = (max(state.index_map.keys()) + 1) if state.index_map else 1
render_sidebar()
ui.button(icon='restart_alt', on_click=reset_index).props('flat round color=grey-5').tooltip('Auto-set next available')
ui.separator().classes('my-4 opacity-20')
# Add new category
ui.label("New Category").classes('text-xs font-semibold text-gray-400 uppercase tracking-wider mb-2')
with ui.row().classes('w-full items-center no-wrap gap-2'):
new_cat_input = ui.input(placeholder='Category name...') \
.props('dense outlined dark').classes('flex-grow')
def add_category():
if new_cat_input.value:
SorterEngine.add_category(new_cat_input.value)
state.active_cat = new_cat_input.value
refresh_staged_info()
render_sidebar()
ui.button(icon='add', on_click=add_category).props('flat round color=green')
# Danger zone
with ui.expansion('Danger Zone', icon='warning').classes('w-full mt-6').props('dense header-class="text-red-400"'):
def delete_category():
SorterEngine.delete_category(state.active_cat)
refresh_staged_info()
render_sidebar()
ui.button('Delete Category', color='red', on_click=delete_category) \
.classes('w-full btn-danger').props('flat')
def render_stats():
"""Render statistics bar."""
if state.stats_container:
state.stats_container.clear()
with state.stats_container:
with ui.row().classes('gap-4'):
# Total images
with ui.row().classes('stat-pill items-center gap-2'):
ui.icon('photo_library', size='16px').classes('text-gray-400')
ui.label('Total:').classes('text-gray-400')
ui.label(str(state.total_count)).classes('value')
# Tagged count
with ui.row().classes('stat-pill items-center gap-2'):
ui.icon('sell', size='16px').classes('text-green-400')
ui.label('Tagged:').classes('text-gray-400')
ui.label(str(state.tagged_count)).classes('value')
# Current category
with ui.row().classes('stat-pill items-center gap-2'):
ui.icon('folder', size='16px').classes('text-blue-400')
ui.label('Category:').classes('text-gray-400')
ui.label(state.active_cat).classes('value text-blue-400')
def render_gallery():
"""Render image gallery grid with optimized rendering."""
state.grid_container.clear()
batch = state.get_current_batch()
with state.grid_container:
if not batch:
with ui.column().classes('w-full items-center py-20'):
ui.icon('photo_library', size='80px').classes('text-gray-700 mb-4')
ui.label('No images loaded').classes('text-xl text-gray-500')
ui.label('Select a source folder and click LOAD').classes('text-gray-600')
return
with ui.grid(columns=state.grid_cols).classes('w-full gap-4'):
for idx, img_path in enumerate(batch):
render_image_card(img_path, idx)
def render_image_card(img_path: str, batch_idx: int):
"""Render individual image card with enhanced styling."""
is_staged = img_path in state.staged_data
thumb_size = 600
card_class = 'image-card rounded-xl overflow-hidden'
if is_staged:
card_class += ' tagged'
with ui.card().classes(card_class).style('animation-delay: {}ms'.format(batch_idx * 30)):
# Thumbnail with hover effects
with ui.column().classes('relative'):
ui.image(f"/thumbnail?path={img_path}&size={thumb_size}&q={state.preview_quality}") \
.classes('w-full thumb-container cursor-pointer') \
.style('height: 220px; object-fit: cover; background: var(--bg-deep)') \
.on('click', lambda p=img_path: open_zoom_dialog(p))
# Overlay buttons
with ui.row().classes('absolute top-2 right-2 gap-1'):
ui.button(icon='zoom_in', on_click=lambda p=img_path: open_zoom_dialog(p)) \
.props('flat round dense size=sm').classes('bg-black/50 text-white')
ui.button(icon='delete', on_click=lambda p=img_path: action_delete(p)) \
.props('flat round dense size=sm').classes('bg-black/50 text-red-400')
# Info section
with ui.column().classes('p-3 gap-2'):
# Filename
ui.label(os.path.basename(img_path)[:20]).classes('text-xs text-gray-400 truncate mono')
# Tagging UI
if is_staged:
info = state.staged_data[img_path]
idx = _extract_index(info['name'])
idx_str = str(idx) if idx else "?"
with ui.row().classes('w-full justify-between items-center'):
ui.html(f'{info["cat"]} #{idx_str}')
ui.button('Untag', on_click=lambda p=img_path: action_untag(p)) \
.props('flat dense size=sm color=grey')
else:
with ui.row().classes('w-full no-wrap gap-2 items-center'):
local_idx = ui.number(value=state.next_index, precision=0) \
.props('dense dark outlined borderless').classes('w-16').style('font-family: "JetBrains Mono"')
ui.button('Tag', on_click=lambda p=img_path, i=local_idx: action_tag(p, int(i.value))) \
.classes('flex-grow btn-primary').props('dense unelevated')
def render_pagination():
"""Render pagination controls with refined design."""
state.pagination_container.clear()
if state.total_pages <= 1:
return
with state.pagination_container:
with ui.row().classes('w-full items-center justify-center gap-4 mb-4'):
# Previous button
ui.button(icon='chevron_left', on_click=lambda: set_page(state.page - 1)) \
.props('flat round').classes('text-white' if state.page > 0 else 'text-gray-700') \
.set_enabled(state.page > 0)
# Page info
with ui.row().classes('items-center gap-3'):
ui.label(f'Page {state.page + 1} of {state.total_pages}') \
.classes('text-gray-400 mono text-sm')
# Quick jump
ui.number(value=state.page + 1, min=1, max=state.total_pages, precision=0) \
.props('dense dark outlined borderless').classes('w-16') \
.on('change', lambda e: set_page(int(e.value) - 1))
# Next button
ui.button(icon='chevron_right', on_click=lambda: set_page(state.page + 1)) \
.props('flat round').classes('text-white' if state.page < state.total_pages - 1 else 'text-gray-700') \
.set_enabled(state.page < state.total_pages - 1)
# Page dots/buttons
with ui.row().classes('items-center justify-center gap-1 flex-wrap'):
start = max(0, state.page - 3)
end = min(state.total_pages, state.page + 4)
if start > 0:
ui.button('1', on_click=lambda: set_page(0)).props('flat dense size=sm color=grey')
if start > 1:
ui.label('...').classes('text-gray-600 px-1')
for p in range(start, end):
has_tags = p in state.green_dots
is_current = p == state.page
btn_color = 'green' if is_current else ('light-green-8' if has_tags else 'grey-8')
btn = ui.button(str(p + 1), on_click=lambda page=p: set_page(page)) \
.props(f'flat dense size=sm color={btn_color}')
if is_current:
btn.classes('font-bold')
if end < state.total_pages:
if end < state.total_pages - 1:
ui.label('...').classes('text-gray-600 px-1')
ui.button(str(state.total_pages), on_click=lambda: set_page(state.total_pages - 1)) \
.props('flat dense size=sm color=grey')
def set_page(p: int):
"""Navigate to specific page."""
state.page = max(0, min(p, state.total_pages - 1))
refresh_ui()
def refresh_ui():
"""Refresh all UI components."""
render_sidebar()
render_pagination()
render_gallery()
render_stats()
def handle_keyboard(e):
"""Handle keyboard navigation."""
if not e.action.keydown:
return
if e.key.arrow_left and state.page > 0:
set_page(state.page - 1)
elif e.key.arrow_right and state.page < state.total_pages - 1:
set_page(state.page + 1)
# ==========================================
# MAIN LAYOUT
# ==========================================
def build_header():
"""Build application header with refined design."""
with ui.header().classes('app-header items-center text-white').style('height: 72px; padding: 0 24px'):
with ui.row().classes('w-full items-center gap-6 no-wrap'):
# Logo
with ui.row().classes('items-center gap-2 shrink-0'):
ui.icon('photo_library', size='28px').classes('text-green-400')
ui.label('NiceSorter').classes('text-xl font-bold text-white')
# Profile selector
profile_names = list(state.profiles.keys())
def change_profile(e):
state.profile_name = e.value
state.load_active_profile()
load_images()
with ui.row().classes('items-center gap-2'):
ui.icon('person', size='18px').classes('text-gray-500')
ui.select(profile_names, value=state.profile_name, on_change=change_profile) \
.props('dark dense borderless').classes('w-28')
# Source and output paths
with ui.row().classes('flex-grow gap-3'):
with ui.row().classes('flex-grow items-center gap-2'):
ui.icon('folder_open', size='18px').classes('text-blue-400')
ui.input('Source').bind_value(state, 'source_dir') \
.classes('flex-grow').props('dark dense outlined')
with ui.row().classes('flex-grow items-center gap-2'):
ui.icon('save', size='18px').classes('text-green-400')
ui.input('Output').bind_value(state, 'output_dir') \
.classes('flex-grow').props('dark dense outlined')
# Action buttons
ui.button(icon='save', on_click=state.save_current_profile) \
.props('flat round').classes('text-white').tooltip('Save Profile')
ui.button('LOAD', on_click=load_images) \
.props('unelevated color=green').classes('font-semibold px-6')
# Settings menu
with ui.button(icon='tune').props('flat round').classes('text-white'):
with ui.menu().classes('p-4').style('background: var(--bg-elevated); min-width: 280px'):
ui.label('Display Settings').classes('text-sm font-semibold text-gray-400 uppercase tracking-wider mb-4')
ui.label('Grid Columns').classes('text-gray-400 text-sm mb-1')
ui.slider(
min=2, max=8, step=1,
value=state.grid_cols,
on_change=lambda e: (setattr(state, 'grid_cols', int(e.value)), refresh_ui())
).props('color=green label-always')
ui.label('Preview Quality').classes('text-gray-400 text-sm mb-1 mt-4')
ui.slider(
min=10, max=100, step=10,
value=state.preview_quality,
on_change=lambda e: (setattr(state, 'preview_quality', int(e.value)), refresh_ui())
).props('color=green label-always')
def build_sidebar():
"""Build left sidebar with refined styling."""
with ui.left_drawer(value=True).classes('app-sidebar p-5').props('width=300'):
state.sidebar_container = ui.column().classes('w-full')
def build_main_content():
"""Build main content area."""
with ui.column().classes('w-full p-6 min-h-screen text-white').style('background: var(--bg-deep)'):
# Stats bar
state.stats_container = ui.row().classes('w-full mb-4')
# Pagination
state.pagination_container = ui.column().classes('w-full items-center mb-6')
# Gallery grid
state.grid_container = ui.column().classes('w-full')
# Footer with batch controls
ui.separator().classes('my-10 opacity-10')
with ui.card().classes('w-full p-6 rounded-2xl').style('background: var(--bg-card); border: 1px solid var(--border-subtle)'):
with ui.row().classes('w-full justify-between items-center flex-wrap gap-6'):
# Tagged files mode
with ui.column().classes('gap-2'):
ui.label('Tagged Files').classes('text-xs font-semibold text-gray-400 uppercase tracking-wider')
ui.radio(['Copy', 'Move'], value=state.batch_mode) \
.bind_value(state, 'batch_mode') \
.props('inline dark color=green')
# Untagged files mode
with ui.column().classes('gap-2'):
ui.label('Untagged Files').classes('text-xs font-semibold text-gray-400 uppercase tracking-wider')
ui.radio(['Keep', 'Move to Unused', 'Delete'], value=state.cleanup_mode) \
.bind_value(state, 'cleanup_mode') \
.props('inline dark color=green')
# Action buttons
with ui.row().classes('items-center gap-4'):
ui.button('Apply Page', on_click=action_apply_page) \
.props('outline color=white').classes('px-6')
ui.button('Apply Global', on_click=action_apply_global) \
.props('unelevated').classes('btn-danger px-6')
# ==========================================
# INITIALIZATION
# ==========================================
# Inject custom CSS
ui.add_head_html(CUSTOM_CSS)
# Initialize database
SorterEngine.init_db()
# Build UI
build_header()
build_sidebar()
build_main_content()
# Setup keyboard navigation
ui.keyboard(on_key=handle_keyboard)
# Enable dark mode
ui.dark_mode().enable()
# Initial load
load_images()
# Run server
ui.run(title="NiceSorter", host="0.0.0.0", port=8080, reload=False)