import pychromecast from pychromecast.controllers.media import MediaController import time import threading from typing import List, Optional from .utils import get_logger logger = get_logger(__name__) class CastManager: def __init__(self): self.chromecast = None self.mc: Optional[MediaController] = None self.queue: List[str] = [] self.current_index = -1 self.is_playing = False self._lock = threading.Lock() self._discover_thread = None self.browser = None self.zconf = None self.found_devices = {} def discover(self, timeout=5): logger.info("Starting Chromecast discovery...") import zeroconf from pychromecast.discovery import CastBrowser, SimpleCastListener from .utils import get_host_ip if not self.browser: try: ip = get_host_ip() logger.info(f"Binding zeroconf to IP: {ip} (unicast mode)") # unicast=True prevents binding to port 5353, avoiding conflict with Avahi self.zconf = zeroconf.Zeroconf(interfaces=[ip], unicast=True) except Exception as e: logger.error(f"Zeroconf init failed: {e}") self.zconf = zeroconf.Zeroconf(unicast=True) self.found_devices = {} def add_callback(uuid, service): cast_info = self.browser.devices[uuid] cast = pychromecast.get_chromecast_from_cast_info(cast_info, self.zconf) self.found_devices[cast.name] = cast logger.info(f"Discovered Chromecast: {cast.name}") listener = SimpleCastListener(add_callback=add_callback) self.browser = CastBrowser(listener, zeroconf_instance=self.zconf) self.browser.start_discovery() # Wait up to timeout seconds to find at least one start_time = time.time() while time.time() - start_time < timeout: if self.found_devices: break time.sleep(0.1) if self.found_devices: # Pick the first one self.chromecast = list(self.found_devices.values())[0] logger.info(f"Selected Chromecast: {self.chromecast.name}") self.chromecast.wait() self.mc = self.chromecast.media_controller self.mc.register_status_listener(self) else: logger.error("No Chromecasts found.") def play_queue(self, urls: List[str]): with self._lock: self.queue = urls self.current_index = 0 self._play_current() def _play_current(self): if 0 <= self.current_index < len(self.queue): url = self.queue[self.current_index] logger.info(f"Playing {self.current_index + 1}/{len(self.queue)}: {url}") if not self.chromecast: self.discover() if self.chromecast: self.chromecast.wait() # Use generic video type, or try to guess from URL self.mc.play_media(url, 'video/mp4') self.is_playing = True else: logger.error("Cannot play: No Chromecast connected.") else: logger.info("Queue finished.") self.is_playing = False self.current_index = -1 def stop(self): with self._lock: if self.mc: self.mc.stop() self.queue = [] self.current_index = -1 self.is_playing = False def new_media_status(self, status): """Callback from pychromecast when media status changes.""" logger.debug(f"Media status update: {status.player_state}") # Check if the current video finished if status.player_state == "IDLE" and status.idle_reason == "FINISHED": logger.info("Current track finished. Moving to next...") with self._lock: if self.current_index != -1: self.current_index += 1 # Small delay to ensure state transitions threading.Timer(1.0, self._play_current).start() def get_status(self): status = { "is_playing": self.is_playing, "queue_len": len(self.queue), "current_index": self.current_index, } if self.mc and self.mc.status: status.update({ "player_state": self.mc.status.player_state, "current_time": self.mc.status.current_time, "duration": self.mc.status.duration, }) return status # Singleton instance cast_manager = CastManager()