import os import json import ffmpeg import hashlib from pathlib import Path from typing import List, Dict, Optional from .utils import get_logger, VIDEOS_DIR, THUMBNAILS_DIR, CONFIG_DIR logger = get_logger(__name__) LIBRARY_CACHE = CONFIG_DIR / "library_cache.json" class MediaItem: def __init__(self, path: Path, duration: float, thumbnail_path: str): self.path = path self.duration = duration self.thumbnail_path = thumbnail_path self.id = hashlib.md5(str(path).encode()).hexdigest() def to_dict(self): return { "id": self.id, "filename": self.path.name, "path": str(self.path), "duration": self.duration, "thumbnail": self.thumbnail_path } def get_video_duration(path: Path) -> float: try: probe = ffmpeg.probe(str(path)) video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video') return float(probe['format']['duration']) except Exception as e: logger.error(f"Error probing {path}: {e}") return 0.0 def generate_thumbnail(path: Path) -> Optional[str]: thumb_name = hashlib.md5(str(path).encode()).hexdigest() + ".jpg" thumb_path = THUMBNAILS_DIR / thumb_name if thumb_path.exists(): return thumb_name try: # Get duration to pick a frame from the middle duration = get_video_duration(path) # Take a frame at 50% or 10s if duration is long time = min(10, duration / 2) if duration > 0 else 0 ( ffmpeg .input(str(path), ss=time) .filter('scale', 640, -1) .output(str(thumb_path), vframes=1) .overwrite_output() .run(quiet=True) ) return thumb_name except Exception as e: logger.error(f"Error generating thumbnail for {path}: {e}") return None def scan_library() -> Dict[str, List[Dict]]: logger.info(f"Scanning library in {VIDEOS_DIR}...") cache = {} if LIBRARY_CACHE.exists(): try: with open(LIBRARY_CACHE, 'r') as f: cache = json.load(f) except Exception: logger.warning("Could not read library cache.") valid_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.m4v'} stories = [] music = [] for root, _, files in os.walk(VIDEOS_DIR): for file in files: path = Path(root) / file if path.suffix.lower() not in valid_extensions: continue rel_path = str(path.relative_to(VIDEOS_DIR)) # Check cache item_data = cache.get(rel_path) if item_data and os.path.getmtime(path) == item_data.get('mtime'): # Check if thumbnail still exists if (THUMBNAILS_DIR / item_data['thumbnail']).exists(): item = item_data else: item_data = None # Force re-gen if not item_data or os.path.getmtime(path) != item_data.get('mtime'): duration = get_video_duration(path) thumb_name = generate_thumbnail(path) if thumb_name: item = { "id": hashlib.md5(str(path).encode()).hexdigest(), "filename": path.name, "path": rel_path, "duration": duration, "thumbnail": thumb_name, "mtime": os.path.getmtime(path) } cache[rel_path] = item else: continue # Categorize # Stories < 15m (900s) # Music > 2h (7200s) if item['duration'] < 900: stories.append(item) elif item['duration'] > 7200: music.append(item) # Save cache with open(LIBRARY_CACHE, 'w') as f: json.dump(cache, f) return { "stories": stories, "music": music }