first commit
This commit is contained in:
+127
@@ -0,0 +1,127 @@
|
||||
import os
|
||||
import json
|
||||
import ffmpeg
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
from .utils import get_logger, VIDEOS_DIR, THUMBNAILS_DIR, CONFIG_DIR
|
||||
|
||||
logger = get_logger(__name__)
|
||||
LIBRARY_CACHE = CONFIG_DIR / "library_cache.json"
|
||||
|
||||
class MediaItem:
|
||||
def __init__(self, path: Path, duration: float, thumbnail_path: str):
|
||||
self.path = path
|
||||
self.duration = duration
|
||||
self.thumbnail_path = thumbnail_path
|
||||
self.id = hashlib.md5(str(path).encode()).hexdigest()
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"id": self.id,
|
||||
"filename": self.path.name,
|
||||
"path": str(self.path),
|
||||
"duration": self.duration,
|
||||
"thumbnail": self.thumbnail_path
|
||||
}
|
||||
|
||||
def get_video_duration(path: Path) -> float:
|
||||
try:
|
||||
probe = ffmpeg.probe(str(path))
|
||||
video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')
|
||||
return float(probe['format']['duration'])
|
||||
except Exception as e:
|
||||
logger.error(f"Error probing {path}: {e}")
|
||||
return 0.0
|
||||
|
||||
def generate_thumbnail(path: Path) -> Optional[str]:
|
||||
thumb_name = hashlib.md5(str(path).encode()).hexdigest() + ".jpg"
|
||||
thumb_path = THUMBNAILS_DIR / thumb_name
|
||||
|
||||
if thumb_path.exists():
|
||||
return thumb_name
|
||||
|
||||
try:
|
||||
# Get duration to pick a frame from the middle
|
||||
duration = get_video_duration(path)
|
||||
# Take a frame at 50% or 10s if duration is long
|
||||
time = min(10, duration / 2) if duration > 0 else 0
|
||||
|
||||
(
|
||||
ffmpeg
|
||||
.input(str(path), ss=time)
|
||||
.filter('scale', 640, -1)
|
||||
.output(str(thumb_path), vframes=1)
|
||||
.overwrite_output()
|
||||
.run(quiet=True)
|
||||
)
|
||||
return thumb_name
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating thumbnail for {path}: {e}")
|
||||
return None
|
||||
|
||||
def scan_library() -> Dict[str, List[Dict]]:
|
||||
logger.info(f"Scanning library in {VIDEOS_DIR}...")
|
||||
|
||||
cache = {}
|
||||
if LIBRARY_CACHE.exists():
|
||||
try:
|
||||
with open(LIBRARY_CACHE, 'r') as f:
|
||||
cache = json.load(f)
|
||||
except Exception:
|
||||
logger.warning("Could not read library cache.")
|
||||
|
||||
valid_extensions = {'.mp4', '.mkv', '.avi', '.mov', '.m4v'}
|
||||
stories = []
|
||||
music = []
|
||||
|
||||
for root, _, files in os.walk(VIDEOS_DIR):
|
||||
for file in files:
|
||||
path = Path(root) / file
|
||||
if path.suffix.lower() not in valid_extensions:
|
||||
continue
|
||||
|
||||
rel_path = str(path.relative_to(VIDEOS_DIR))
|
||||
|
||||
# Check cache
|
||||
item_data = cache.get(rel_path)
|
||||
if item_data and os.path.getmtime(path) == item_data.get('mtime'):
|
||||
# Check if thumbnail still exists
|
||||
if (THUMBNAILS_DIR / item_data['thumbnail']).exists():
|
||||
item = item_data
|
||||
else:
|
||||
item_data = None # Force re-gen
|
||||
|
||||
if not item_data or os.path.getmtime(path) != item_data.get('mtime'):
|
||||
duration = get_video_duration(path)
|
||||
thumb_name = generate_thumbnail(path)
|
||||
|
||||
if thumb_name:
|
||||
item = {
|
||||
"id": hashlib.md5(str(path).encode()).hexdigest(),
|
||||
"filename": path.name,
|
||||
"path": rel_path,
|
||||
"duration": duration,
|
||||
"thumbnail": thumb_name,
|
||||
"mtime": os.path.getmtime(path)
|
||||
}
|
||||
cache[rel_path] = item
|
||||
else:
|
||||
continue
|
||||
|
||||
# Categorize
|
||||
# Stories < 15m (900s)
|
||||
# Music > 2h (7200s)
|
||||
if item['duration'] < 900:
|
||||
stories.append(item)
|
||||
elif item['duration'] > 7200:
|
||||
music.append(item)
|
||||
|
||||
# Save cache
|
||||
with open(LIBRARY_CACHE, 'w') as f:
|
||||
json.dump(cache, f)
|
||||
|
||||
return {
|
||||
"stories": stories,
|
||||
"music": music
|
||||
}
|
||||
Reference in New Issue
Block a user