import streamlit as st
import streamlit.components.v1 as components
import requests
import os
from collections import defaultdict
from datetime import datetime
st.set_page_config(page_title="Crunchyroll Watchlist", page_icon="🍊", layout="wide")
st.markdown("""
""", unsafe_allow_html=True)
# ── Constants ────────────────────────────────────────────────────────────────
AUTH_URL = "https://www.crunchyroll.com"
API_URL = "https://beta-api.crunchyroll.com"
CR_CLIENT_ID = "t-kdgp2h8c3jub8fn0fq"
CR_CLIENT_SECRET = "yfLDfMfrYvKXh4JXS1LEI2cCqu1v5Wan"
HEADERS = {"User-Agent": "Crunchyroll/3.0.0 Android/5.1.1 okhttp/3.12.1"}
CATEGORY_ORDER = ["Continue", "Up Next", "Start Watching", "Watch Again"]
CATEGORY_ICONS = {
"Continue": "▶",
"Up Next": "⏭",
"Start Watching":"✦",
"Watch Again": "↺",
}
# ── iframe HTML template ─────────────────────────────────────────────────────
# Built as a plain string (no f-string) to avoid brace-escaping headaches.
IFRAME_TEMPLATE = """\
"""
# ── API helpers (cached) ─────────────────────────────────────────────────────
@st.cache_data(ttl=300, show_spinner=False)
def fetch_token(email, password):
r = requests.post(
AUTH_URL + "/auth/v1/token",
data={"username": email, "password": password, "grant_type": "password", "scope": "offline_access"},
headers=HEADERS, auth=(CR_CLIENT_ID, CR_CLIENT_SECRET), timeout=10,
)
r.raise_for_status()
return r.json()["access_token"]
@st.cache_data(ttl=300, show_spinner=False)
def fetch_account_id(token):
r = requests.get(API_URL + "/accounts/v1/me",
headers={**HEADERS, "Authorization": "Bearer " + token}, timeout=10)
r.raise_for_status()
return r.json()["account_id"]
@st.cache_data(ttl=300, show_spinner=False)
def fetch_watchlist(token, account_id, n=200):
r = requests.get(
API_URL + "/content/v2/discover/" + account_id + "/watchlist",
params={"locale": "en-US", "n": n},
headers={**HEADERS, "Authorization": "Bearer " + token}, timeout=10,
)
r.raise_for_status()
return r.json().get("data", [])
# ── Helpers ──────────────────────────────────────────────────────────────────
def categorise(item):
if item.get("never_watched"): return "Start Watching"
if item.get("fully_watched"): return "Watch Again"
if item.get("playhead", 0) > 0: return "Continue"
return "Up Next"
def get_thumbnail(panel):
try:
sizes = panel["images"]["thumbnail"][0]
for s in sizes:
if s.get("width") == 640:
return s["source"]
return sizes[-1]["source"]
except (KeyError, IndexError):
return ""
def esc(s):
return str(s).replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """)
def build_card(item):
panel = item.get("panel", {})
meta = panel.get("episode_metadata", {})
series = meta.get("series_title", panel.get("title", "Unknown"))
ep_num = meta.get("episode_number", "")
season = meta.get("season_number", "")
ep_title = panel.get("title", "")
duration = meta.get("duration_ms", 0) // 1000
playhead = item.get("playhead", 0)
thumb = get_thumbnail(panel)
ep_label = "S{}E{}".format(season, ep_num) if season and ep_num else ""
ep_id = panel.get("id", "")
ep_slug = panel.get("slug_title", "")
url = "https://www.crunchyroll.com/watch/{}/{}".format(ep_id, ep_slug) if ep_id else "#"
prog = ""
if playhead > 0 and not item.get("fully_watched") and duration:
pct = min(int(playhead / duration * 100), 100)
prog = ''.format(pct)
img = '
'.format(esc(thumb)) if thumb else ''
badge = '{}'.format(esc(ep_label)) if ep_label else ""
return (
''
'{}{}{}
'
''
''
).format(esc(url), img, badge, prog, esc(series), esc(ep_title), esc(ep_label))
def render_row(entries):
cards_html = "".join(build_card(i) for i in entries)
html = IFRAME_TEMPLATE.replace("CARDS_PLACEHOLDER", cards_html)
components.html(html, height=255, scrolling=False)
# ── Session state ────────────────────────────────────────────────────────────
for k in ("logged_in", "token", "account_id", "watchlist", "fetched_at"):
st.session_state.setdefault(k, None)
st.session_state.setdefault("logged_in", False)
# ── Auto-login ───────────────────────────────────────────────────────────────
if not st.session_state.logged_in:
env_email = os.environ.get("CR_EMAIL")
env_pass = os.environ.get("CR_PASSWORD")
if env_email and env_pass:
try:
tok = fetch_token(env_email, env_pass)
aid = fetch_account_id(tok)
wl = fetch_watchlist(tok, aid)
st.session_state.update(
logged_in=True, token=tok, account_id=aid,
watchlist=wl, fetched_at=datetime.now()
)
st.rerun()
except Exception:
# Fallback to manual login if auto-login fails
pass
# ── Header ───────────────────────────────────────────────────────────────────
st.markdown("""
🍊 Watchlist
Crunchyroll — personal queue dashboard
""", unsafe_allow_html=True)
# ── Login ────────────────────────────────────────────────────────────────────
if not st.session_state.logged_in:
_, mid, _ = st.columns([1, 1.2, 1])
with mid:
st.markdown("#### Sign in to Crunchyroll")
email = st.text_input("Email", key="email_input")
password = st.text_input("Password", key="pass_input", type="password")
if st.button("Load Watchlist"):
if not email or not password:
st.error("Please enter your email and password.")
else:
with st.spinner("Signing in..."):
try:
tok = fetch_token(email, password)
aid = fetch_account_id(tok)
wl = fetch_watchlist(tok, aid)
st.session_state.update(
logged_in=True, token=tok, account_id=aid,
watchlist=wl, fetched_at=datetime.now()
)
st.rerun()
except requests.HTTPError as e:
st.error("Login failed: {}".format(e))
except Exception as e:
st.error("Error: {}".format(e))
# ── Dashboard ────────────────────────────────────────────────────────────────
else:
watchlist = st.session_state.watchlist
fetched_at = st.session_state.fetched_at
# Toolbar
c1, c2 = st.columns([6, 1])
with c1:
age = int((datetime.now() - fetched_at).total_seconds()) if fetched_at else 0
astr = "{}s ago".format(age) if age < 60 else "{}m ago".format(age // 60)
st.markdown(
''
'Cached · fetched {} · refreshes every 5 min
'.format(astr),
unsafe_allow_html=True,
)
with c2:
if st.button("Refresh"):
fetch_token.clear(); fetch_account_id.clear(); fetch_watchlist.clear()
with st.spinner("Refreshing..."):
try:
tok = fetch_token(
st.session_state.get("email_input", ""),
st.session_state.get("pass_input", ""),
)
wl = fetch_watchlist(tok, st.session_state.account_id)
st.session_state.update(watchlist=wl, fetched_at=datetime.now())
st.rerun()
except Exception as e:
st.error("Refresh failed: {}".format(e))
# Bucket items
buckets = defaultdict(list)
for item in watchlist:
buckets[categorise(item)].append(item)
# Stats
stat_boxes = ""
for c in CATEGORY_ORDER:
n = len(buckets.get(c, []))
stat_boxes += (
''
).format(n, c)
stat_boxes += (
''
).format(len(watchlist))
st.markdown('{}
'.format(stat_boxes), unsafe_allow_html=True)
# Category rows
for cat in CATEGORY_ORDER:
entries = buckets.get(cat, [])
if not entries:
continue
icon = CATEGORY_ICONS[cat]
st.markdown(
''.format(icon, cat, len(entries)),
unsafe_allow_html=True,
)
render_row(entries)
# Sign out
st.markdown("
", unsafe_allow_html=True)
if st.button("Sign out"):
for k in ("logged_in", "token", "account_id", "watchlist", "fetched_at"):
st.session_state[k] = None
st.session_state.logged_in = False
fetch_token.clear(); fetch_account_id.clear(); fetch_watchlist.clear()
st.rerun()