import streamlit as st import streamlit.components.v1 as components import requests import os from collections import defaultdict from datetime import datetime st.set_page_config(page_title="Crunchyroll Watchlist", page_icon="🍊", layout="wide") st.markdown(""" """, unsafe_allow_html=True) # ── Constants ──────────────────────────────────────────────────────────────── AUTH_URL = "https://www.crunchyroll.com" API_URL = "https://beta-api.crunchyroll.com" CR_CLIENT_ID = "t-kdgp2h8c3jub8fn0fq" CR_CLIENT_SECRET = "yfLDfMfrYvKXh4JXS1LEI2cCqu1v5Wan" HEADERS = {"User-Agent": "Crunchyroll/3.0.0 Android/5.1.1 okhttp/3.12.1"} CATEGORY_ORDER = ["Continue", "Up Next", "Start Watching", "Watch Again"] CATEGORY_ICONS = { "Continue": "▶", "Up Next": "⏭", "Start Watching":"✦", "Watch Again": "↺", } # ── iframe HTML template ───────────────────────────────────────────────────── # Built as a plain string (no f-string) to avoid brace-escaping headaches. IFRAME_TEMPLATE = """\
CARDS_PLACEHOLDER
""" # ── API helpers (cached) ───────────────────────────────────────────────────── @st.cache_data(ttl=300, show_spinner=False) def fetch_token(email, password): r = requests.post( AUTH_URL + "/auth/v1/token", data={"username": email, "password": password, "grant_type": "password", "scope": "offline_access"}, headers=HEADERS, auth=(CR_CLIENT_ID, CR_CLIENT_SECRET), timeout=10, ) r.raise_for_status() return r.json()["access_token"] @st.cache_data(ttl=300, show_spinner=False) def fetch_account_id(token): r = requests.get(API_URL + "/accounts/v1/me", headers={**HEADERS, "Authorization": "Bearer " + token}, timeout=10) r.raise_for_status() return r.json()["account_id"] @st.cache_data(ttl=300, show_spinner=False) def fetch_watchlist(token, account_id, n=200): r = requests.get( API_URL + "/content/v2/discover/" + account_id + "/watchlist", params={"locale": "en-US", "n": n}, headers={**HEADERS, "Authorization": "Bearer " + token}, timeout=10, ) r.raise_for_status() return r.json().get("data", []) # ── Helpers ────────────────────────────────────────────────────────────────── def categorise(item): if item.get("never_watched"): return "Start Watching" if item.get("fully_watched"): return "Watch Again" if item.get("playhead", 0) > 0: return "Continue" return "Up Next" def get_thumbnail(panel): try: sizes = panel["images"]["thumbnail"][0] for s in sizes: if s.get("width") == 640: return s["source"] return sizes[-1]["source"] except (KeyError, IndexError): return "" def esc(s): return str(s).replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """) def build_card(item): panel = item.get("panel", {}) meta = panel.get("episode_metadata", {}) series = meta.get("series_title", panel.get("title", "Unknown")) ep_num = meta.get("episode_number", "") season = meta.get("season_number", "") ep_title = panel.get("title", "") duration = meta.get("duration_ms", 0) // 1000 playhead = item.get("playhead", 0) thumb = get_thumbnail(panel) ep_label = "S{}E{}".format(season, ep_num) if season and ep_num else "" ep_id = panel.get("id", "") ep_slug = panel.get("slug_title", "") url = "https://www.crunchyroll.com/watch/{}/{}".format(ep_id, ep_slug) if ep_id else "#" prog = "" if playhead > 0 and not item.get("fully_watched") and duration: pct = min(int(playhead / duration * 100), 100) prog = '
'.format(pct) img = ''.format(esc(thumb)) if thumb else '
' badge = '{}'.format(esc(ep_label)) if ep_label else "" return ( '' '
{}{}{}
' '
' '
{}
' '
{}
' '
{}
' '
' '
' ).format(esc(url), img, badge, prog, esc(series), esc(ep_title), esc(ep_label)) def render_row(entries): cards_html = "".join(build_card(i) for i in entries) html = IFRAME_TEMPLATE.replace("CARDS_PLACEHOLDER", cards_html) components.html(html, height=255, scrolling=False) # ── Session state ──────────────────────────────────────────────────────────── for k in ("logged_in", "token", "account_id", "watchlist", "fetched_at"): st.session_state.setdefault(k, None) st.session_state.setdefault("logged_in", False) # ── Auto-login ─────────────────────────────────────────────────────────────── if not st.session_state.logged_in: env_email = os.environ.get("CR_EMAIL") env_pass = os.environ.get("CR_PASSWORD") if env_email and env_pass: try: tok = fetch_token(env_email, env_pass) aid = fetch_account_id(tok) wl = fetch_watchlist(tok, aid) st.session_state.update( logged_in=True, token=tok, account_id=aid, watchlist=wl, fetched_at=datetime.now() ) st.rerun() except Exception: # Fallback to manual login if auto-login fails pass # ── Header ─────────────────────────────────────────────────────────────────── st.markdown("""
Crunchyroll — personal queue dashboard
""", unsafe_allow_html=True) # ── Login ──────────────────────────────────────────────────────────────────── if not st.session_state.logged_in: _, mid, _ = st.columns([1, 1.2, 1]) with mid: st.markdown("#### Sign in to Crunchyroll") email = st.text_input("Email", key="email_input") password = st.text_input("Password", key="pass_input", type="password") if st.button("Load Watchlist"): if not email or not password: st.error("Please enter your email and password.") else: with st.spinner("Signing in..."): try: tok = fetch_token(email, password) aid = fetch_account_id(tok) wl = fetch_watchlist(tok, aid) st.session_state.update( logged_in=True, token=tok, account_id=aid, watchlist=wl, fetched_at=datetime.now() ) st.rerun() except requests.HTTPError as e: st.error("Login failed: {}".format(e)) except Exception as e: st.error("Error: {}".format(e)) # ── Dashboard ──────────────────────────────────────────────────────────────── else: watchlist = st.session_state.watchlist fetched_at = st.session_state.fetched_at # Toolbar c1, c2 = st.columns([6, 1]) with c1: age = int((datetime.now() - fetched_at).total_seconds()) if fetched_at else 0 astr = "{}s ago".format(age) if age < 60 else "{}m ago".format(age // 60) st.markdown( '
' 'Cached · fetched {} · refreshes every 5 min
'.format(astr), unsafe_allow_html=True, ) with c2: if st.button("Refresh"): fetch_token.clear(); fetch_account_id.clear(); fetch_watchlist.clear() with st.spinner("Refreshing..."): try: tok = fetch_token( st.session_state.get("email_input", ""), st.session_state.get("pass_input", ""), ) wl = fetch_watchlist(tok, st.session_state.account_id) st.session_state.update(watchlist=wl, fetched_at=datetime.now()) st.rerun() except Exception as e: st.error("Refresh failed: {}".format(e)) # Bucket items buckets = defaultdict(list) for item in watchlist: buckets[categorise(item)].append(item) # Stats stat_boxes = "" for c in CATEGORY_ORDER: n = len(buckets.get(c, [])) stat_boxes += ( '
' '
{}
' '
{}
' '
' ).format(n, c) stat_boxes += ( '
' '
{}
' '
Total
' '
' ).format(len(watchlist)) st.markdown('
{}
'.format(stat_boxes), unsafe_allow_html=True) # Category rows for cat in CATEGORY_ORDER: entries = buckets.get(cat, []) if not entries: continue icon = CATEGORY_ICONS[cat] st.markdown( '
' '{} {}' '{}' '
' '
'.format(icon, cat, len(entries)), unsafe_allow_html=True, ) render_row(entries) # Sign out st.markdown("
", unsafe_allow_html=True) if st.button("Sign out"): for k in ("logged_in", "token", "account_id", "watchlist", "fetched_at"): st.session_state[k] = None st.session_state.logged_in = False fetch_token.clear(); fetch_account_id.clear(); fetch_watchlist.clear() st.rerun()