diff --git a/app/api/endpoints/catalogs.py b/app/api/endpoints/catalogs.py index acd052f..41c34e6 100644 --- a/app/api/endpoints/catalogs.py +++ b/app/api/endpoints/catalogs.py @@ -1,43 +1,12 @@ -import json - from fastapi import APIRouter, HTTPException, Response from loguru import logger -from app.core.config import settings from app.core.security import redact_token from app.services.recommendation.catalog_service import catalog_service -from app.services.redis_service import redis_service router = APIRouter() -def _clean_meta(meta: dict) -> dict: - """Return a sanitized Stremio meta object without internal fields. - - Keeps only public keys and drops internal scoring/IDs/keywords/cast, etc. - """ - allowed = { - "id", - "type", - "name", - "poster", - "background", - "description", - "releaseInfo", - "imdbRating", - "genres", - "runtime", - } - cleaned = {k: v for k, v in meta.items() if k in allowed} - # Drop empty values - cleaned = {k: v for k, v in cleaned.items() if v not in (None, "", [], {}, ())} - - # if id does not start with tt, return None - if not cleaned.get("id", "").startswith("tt"): - return None - return cleaned - - @router.get("/{token}/catalog/{type}/{id}.json") async def get_catalog(type: str, id: str, response: Response, token: str): """ @@ -46,12 +15,6 @@ async def get_catalog(type: str, id: str, response: Response, token: str): This endpoint delegates all logic to CatalogService facade. """ try: - # catalog_key - catalog_key = f"watchly:catalog:{token}:{type}:{id}" - cached_data = await redis_service.get(catalog_key) - if cached_data: - return json.loads(cached_data) - # Delegate to catalog service facade recommendations, headers = await catalog_service.get_catalog(token, type, id) @@ -59,15 +22,7 @@ async def get_catalog(type: str, id: str, response: Response, token: str): for key, value in headers.items(): response.headers[key] = value - # Clean and format metadata - cleaned = [_clean_meta(m) for m in recommendations] - cleaned = [m for m in cleaned if m is not None] - - data = {"metas": cleaned} - # if catalog data is not empty, set the cache - if cleaned: - await redis_service.set(catalog_key, json.dumps(data), settings.CATALOG_CACHE_TTL) - return data + return recommendations except HTTPException: raise diff --git a/app/api/endpoints/manifest.py b/app/api/endpoints/manifest.py index 1b0cd3e..30261f5 100644 --- a/app/api/endpoints/manifest.py +++ b/app/api/endpoints/manifest.py @@ -1,142 +1,14 @@ -from fastapi import HTTPException from fastapi.routing import APIRouter -from loguru import logger -from app.core.config import settings -from app.core.settings import UserSettings -from app.core.version import __version__ -from app.services.catalog import DynamicCatalogService -from app.services.catalog_updater import get_config_id -from app.services.stremio.service import StremioBundle -from app.services.token_store import token_store -from app.services.translation import translation_service +from app.services.manifest import manifest_service router = APIRouter() -def get_base_manifest(): - return { - "id": settings.ADDON_ID, - "version": __version__, - "name": settings.ADDON_NAME, - "description": "Movie and series recommendations based on your Stremio library.", - "logo": "https://raw.githubusercontent.com/TimilsinaBimal/Watchly/refs/heads/main/app/static/logo.png", - "background": ("https://raw.githubusercontent.com/TimilsinaBimal/Watchly/refs/heads/main/app/static/cover.png"), - "resources": ["catalog"], - "types": ["movie", "series"], - "idPrefixes": ["tt"], - "catalogs": [], - "behaviorHints": {"configurable": True, "configurationRequired": False}, - "stremioAddonsConfig": { - "issuer": "https://stremio-addons.net", - "signature": ( - "eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0..WSrhzzlj1TuDycD6QoVLuA.Dzmxzr4y83uqQF15r4tC1bB9-vtZRh1Rvy4BqgDYxu91c2esiJuov9KnnI_cboQCgZS7hjwnIqRSlQ-jEyGwXHHRerh9QklyfdxpXqNUyBgTWFzDOVdVvDYJeM_tGMmR.sezAChlWGV7lNS-t9HWB6A" # noqa - ), - }, - } - - -async def build_dynamic_catalogs( - bundle: StremioBundle, auth_key: str, user_settings: UserSettings | None -) -> list[dict]: - # Fetch library using bundle directly - if not user_settings: - logger.error("User settings not found. Please reconfigure the addon.") - raise HTTPException(status_code=401, detail="User settings not found. Please reconfigure the addon.") - - library_items = await bundle.library.get_library_items(auth_key) - dynamic_catalog_service = DynamicCatalogService( - language=user_settings.language, - ) - return await dynamic_catalog_service.get_dynamic_catalogs(library_items, user_settings) - - -async def _manifest_handler(token: str): - # response.headers["Cache-Control"] = "public, max-age=300" # 5 minutes - if not token: - raise HTTPException(status_code=401, detail="Missing token. Please reconfigure the addon.") - - user_settings = None - try: - creds = await token_store.get_user_data(token) - if creds and creds.get("settings"): - user_settings = UserSettings(**creds["settings"]) - except Exception as e: - logger.error(f"[{token}] Error loading user data from token store: {e}") - raise HTTPException(status_code=401, detail="Invalid token session. Please reconfigure.") - - if not creds: - raise HTTPException(status_code=401, detail="Token not found. Please reconfigure the addon.") - - base_manifest = get_base_manifest() - - bundle = StremioBundle() - fetched_catalogs = [] - try: - # Resolve Auth Key (with potential fallback to login) - auth_key = creds.get("authKey") - email = creds.get("email") - password = creds.get("password") - - is_valid = False - if auth_key: - try: - await bundle.auth.get_user_info(auth_key) - is_valid = True - except Exception as e: - logger.debug(f"Auth key check failed for {email or 'unknown'}: {e}") - pass - - if not is_valid and email and password: - try: - auth_key = await bundle.auth.login(email, password) - # Update store - creds["authKey"] = auth_key - await token_store.update_user_data(token, creds) - except Exception as e: - logger.error(f"Failed to refresh auth key during manifest fetch: {e}") - - if auth_key: - fetched_catalogs = await build_dynamic_catalogs( - bundle, - auth_key, - user_settings, - ) - except Exception as e: - logger.exception(f"[{token}] Dynamic catalog build failed: {e}") - fetched_catalogs = [] - finally: - await bundle.close() - - all_catalogs = [c.copy() for c in base_manifest["catalogs"]] + [c.copy() for c in fetched_catalogs] - - translated_catalogs = [] - - # translate to target language - if user_settings and user_settings.language: - for cat in all_catalogs: - if cat.get("name"): - try: - cat["name"] = await translation_service.translate(cat["name"], user_settings.language) - except Exception as e: - logger.warning(f"Failed to translate catalog name '{cat.get('name')}': {e}") - translated_catalogs.append(cat) - else: - translated_catalogs = all_catalogs - - if user_settings: - order_map = {c.id: i for i, c in enumerate(user_settings.catalogs)} - translated_catalogs.sort(key=lambda x: order_map.get(get_config_id(x), 999)) - - if translated_catalogs: - base_manifest["catalogs"] = translated_catalogs - - return base_manifest - - @router.get("/manifest.json") async def manifest(): - manifest = get_base_manifest() + """Get base manifest for unauthenticated users.""" + manifest = manifest_service.get_base_manifest() # since user is not logged in, return empty catalogs manifest["catalogs"] = [] return manifest @@ -144,4 +16,5 @@ async def manifest(): @router.get("/{token}/manifest.json") async def manifest_token(token: str): - return await _manifest_handler(token) + """Get manifest for authenticated user.""" + return await manifest_service.get_manifest_for_token(token) diff --git a/app/api/endpoints/tokens.py b/app/api/endpoints/tokens.py index 7272589..351952c 100644 --- a/app/api/endpoints/tokens.py +++ b/app/api/endpoints/tokens.py @@ -7,6 +7,7 @@ from app.core.config import settings from app.core.security import redact_token from app.core.settings import CatalogConfig, UserSettings, get_default_settings +from app.services.manifest import manifest_service from app.services.stremio.service import StremioBundle from app.services.token_store import token_store @@ -101,11 +102,25 @@ async def create_token(payload: TokenRequest, request: Request) -> TokenResponse # 5. Store user data token = await token_store.store_user_data(user_id, payload_to_store) - logger.info(f"[{redact_token(token)}] Account {'updated' if existing_data else 'created'} for user {user_id}") + account_status = "updated" if existing_data else "created" + logger.info(f"[{redact_token(token)}] Account {account_status} for user {user_id}") + + # 6. Cache library items and profiles before returning + # This ensures manifest generation is fast when user installs the addon + # We wait for caching to complete so everything is ready immediately + try: + logger.info(f"[{redact_token(token)}] Caching library and profiles before returning token") + await manifest_service.cache_library_and_profiles(bundle, stremio_auth_key, user_settings, token) + logger.info(f"[{redact_token(token)}] Successfully cached library and profiles") + except Exception as e: + logger.warning( + f"[{redact_token(token)}] Failed to cache library and profiles: {e}. " + "Continuing anyway - will cache on manifest request." + ) + # Continue even if caching fails - manifest service will handle it base_url = settings.HOST_NAME manifest_url = f"{base_url}/{token}/manifest.json" - # Maybe generate manifest and check if catalogs exist and if not raise error? expires_in = settings.TOKEN_TTL_SECONDS if settings.TOKEN_TTL_SECONDS > 0 else None await bundle.close() diff --git a/app/core/app.py b/app/core/app.py index a640169..13d018b 100644 --- a/app/core/app.py +++ b/app/core/app.py @@ -11,6 +11,7 @@ from app.api.endpoints.meta import fetch_languages_list from app.api.main import api_router +from app.services.redis_service import redis_service from app.services.token_store import token_store from .config import settings @@ -28,10 +29,10 @@ async def lifespan(app: FastAPI): """ yield try: - await token_store.close() - logger.info("TokenStore Redis client closed") + await redis_service.close() + logger.info("Redis client closed") except Exception as exc: - logger.warning(f"Failed to close TokenStore Redis client: {exc}") + logger.warning(f"Failed to close Redis client: {exc}") app = FastAPI( diff --git a/app/core/config.py b/app/core/config.py index 22a7957..35387bb 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -38,7 +38,7 @@ class Settings(BaseSettings): RECOMMENDATION_SOURCE_ITEMS_LIMIT: int = 10 LIBRARY_ITEMS_LIMIT: int = 20 - CATALOG_CACHE_TTL: int = 12 * 60 * 60 # 12 hours + CATALOG_CACHE_TTL: int = 43200 # 12 hours # AI DEFAULT_GEMINI_MODEL: str = "gemma-3-27b-it" diff --git a/app/core/constants.py b/app/core/constants.py index aec4c4f..5c1f515 100644 --- a/app/core/constants.py +++ b/app/core/constants.py @@ -1,9 +1,15 @@ RECOMMENDATIONS_CATALOG_NAME: str = "Top Picks For You" DEFAULT_MIN_ITEMS: int = 8 -DEFAULT_CATALOG_LIMIT = 20 +DEFAULT_CATALOG_LIMIT: int = 20 DEFAULT_CONCURRENCY_LIMIT: int = 30 - DEFAULT_MINIMUM_RATING_FOR_THEME_BASED_MOVIE: float = 7.2 DEFAULT_MINIMUM_RATING_FOR_THEME_BASED_TV: float = 6.8 + + +# cache keys +LIBRARY_ITEMS_KEY: str = "watchly:library_items:{token}" +PROFILE_KEY: str = "watchly:profile:{token}:{content_type}" +WATCHED_SETS_KEY: str = "watchly:watched_sets:{token}:{content_type}" +CATALOG_KEY: str = "watchly:catalog:{token}:{type}:{id}" diff --git a/app/services/catalog_updater.py b/app/services/catalog_updater.py index 7fc9afa..edcb3b2 100644 --- a/app/services/catalog_updater.py +++ b/app/services/catalog_updater.py @@ -9,20 +9,11 @@ from app.core.security import redact_token from app.core.settings import UserSettings from app.services.catalog import DynamicCatalogService +from app.services.manifest import manifest_service from app.services.stremio.service import StremioBundle from app.services.token_store import token_store from app.services.translation import translation_service - - -def get_config_id(catalog) -> str | None: - catalog_id = catalog.get("id", "") - if catalog_id.startswith("watchly.theme."): - return "watchly.theme" - if catalog_id.startswith("watchly.loved."): - return "watchly.loved" - if catalog_id.startswith("watchly.watched."): - return "watchly.watched" - return catalog_id +from app.utils.catalog import get_config_id class CatalogUpdater: @@ -116,13 +107,15 @@ async def refresh_catalogs_for_credentials( user_settings = UserSettings(**credentials["settings"]) except Exception as e: logger.exception(f"[{redact_token(token)}] Failed to parse user settings: {e}") - return True # if user doesn't have setting, we can't update the catalogs. so no need to try again. + # if user doesn't have setting, we can't update the catalogs. + # so no need to try again. + return True - # Fetch fresh library - library_items = await bundle.library.get_library_items(auth_key) + library_items = await manifest_service.cache_library_and_profiles(bundle, auth_key, user_settings, token) + language = user_settings.language if user_settings else "en-US" dynamic_catalog_service = DynamicCatalogService( - language=(user_settings.language if user_settings else "en-US"), + language=language, ) catalogs = await dynamic_catalog_service.get_dynamic_catalogs( diff --git a/app/services/manifest.py b/app/services/manifest.py new file mode 100644 index 0000000..d285336 --- /dev/null +++ b/app/services/manifest.py @@ -0,0 +1,228 @@ +from typing import Any + +from fastapi import HTTPException +from loguru import logger + +from app.core.config import settings +from app.core.security import redact_token +from app.core.settings import UserSettings +from app.core.version import __version__ +from app.services.catalog import DynamicCatalogService +from app.services.profile.integration import ProfileIntegration +from app.services.stremio.service import StremioBundle +from app.services.token_store import token_store +from app.services.translation import translation_service +from app.services.user_cache import user_cache +from app.utils.catalog import cache_profile_and_watched_sets, get_config_id + + +class ManifestService: + """Service for generating Stremio manifest files.""" + + @staticmethod + def get_base_manifest() -> dict[str, Any]: + """Get the base manifest structure.""" + return { + "id": settings.ADDON_ID, + "version": __version__, + "name": settings.ADDON_NAME, + "description": "Movie and series recommendations based on your Stremio library.", + "logo": ("https://raw.githubusercontent.com/TimilsinaBimal/Watchly/refs/heads/main/app/static/logo.png"), + "background": ( + "https://raw.githubusercontent.com/TimilsinaBimal/Watchly/refs/heads/main/app/static/cover.png" + ), + "resources": ["catalog"], + "types": ["movie", "series"], + "idPrefixes": ["tt"], + "catalogs": [], + "behaviorHints": {"configurable": True, "configurationRequired": False}, + "stremioAddonsConfig": { + "issuer": "https://stremio-addons.net", + "signature": ( + "eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0..WSrhzzlj1TuDycD6QoVLuA.Dzmxzr4y83uqQF15r4tC1bB9-vtZRh1Rvy4BqgDYxu91c2esiJuov9KnnI_cboQCgZS7hjwnIqRSlQ-jEyGwXHHRerh9QklyfdxpXqNUyBgTWFzDOVdVvDYJeM_tGMmR.sezAChlWGV7lNS-t9HWB6A" # noqa + ), + }, + } + + async def _resolve_auth_key(self, bundle: StremioBundle, credentials: dict[str, Any], token: str) -> str | None: + """Resolve and validate auth key, refreshing if needed.""" + auth_key = credentials.get("authKey") + email = credentials.get("email") + password = credentials.get("password") + + is_valid = False + if auth_key: + try: + await bundle.auth.get_user_info(auth_key) + is_valid = True + except Exception as e: + logger.debug(f"Auth key check failed for {email or 'unknown'}: {e}") + + if not is_valid and email and password: + try: + auth_key = await bundle.auth.login(email, password) + # Update store + credentials["authKey"] = auth_key + await token_store.update_user_data(token, credentials) + except Exception as e: + logger.error(f"Failed to refresh auth key during manifest fetch: {e}") + return None + + return auth_key + + async def cache_library_and_profiles( + self, bundle: StremioBundle, auth_key: str, user_settings: UserSettings, token: str + ) -> dict[str, Any]: + """ + Fetch and cache library items and profiles for a user. + + This should be called during token creation to pre-cache data + so manifest generation is fast. + + Args: + bundle: StremioBundle instance + auth_key: Stremio auth key + user_settings: User settings + token: User token + + Returns: + Library items dictionary + """ + # Fetch library items + logger.info(f"[{redact_token(token)}] Fetching library items for caching") + library_items = await bundle.library.get_library_items(auth_key) + + # Cache library items using centralized cache service + await user_cache.set_library_items(token, library_items) + logger.debug(f"[{redact_token(token)}] Cached library items") + + # Build and cache profiles for both movie and series + language = user_settings.language + integration_service = ProfileIntegration(language=language) + + for content_type in ["movie", "series"]: + try: + logger.info(f"[{redact_token(token)}] Building and caching profile for {content_type}") + _, _, _ = await cache_profile_and_watched_sets( + token, content_type, integration_service, library_items, bundle, auth_key + ) + logger.debug(f"[{redact_token(token)}] Cached profile and watched sets for {content_type}") + except Exception as e: + logger.warning(f"[{redact_token(token)}] Failed to build/cache profile for {content_type}: {e}") + + return library_items + + async def _ensure_library_and_profiles_cached( + self, bundle: StremioBundle, auth_key: str, user_settings: UserSettings, token: str + ) -> dict[str, Any]: + """Ensure library items and profiles are cached, fetching and building if needed.""" + # Try to get cached library items first + library_items = await user_cache.get_library_items(token) + + if library_items: + logger.debug(f"[{redact_token(token)}] Using cached library items for manifest") + return library_items + + # If not cached, fetch and cache + logger.info(f"[{redact_token(token)}] Library items not cached, fetching from Stremio for manifest") + return await self.cache_library_and_profiles(bundle, auth_key, user_settings, token) + + async def _build_dynamic_catalogs( + self, bundle: StremioBundle, auth_key: str, user_settings: UserSettings | None, token: str + ) -> list[dict[str, Any]]: + """Build dynamic catalogs for the manifest.""" + # Ensure library and profiles are cached + library_items = await self._ensure_library_and_profiles_cached(bundle, auth_key, user_settings, token) + + dynamic_catalog_service = DynamicCatalogService(language=user_settings.language) + return await dynamic_catalog_service.get_dynamic_catalogs(library_items, user_settings) + + async def _translate_catalogs(self, catalogs: list[dict[str, Any]], language: str | None) -> list[dict[str, Any]]: + """Translate catalog names to target language.""" + if not language: + return catalogs + + translated_catalogs = [] + for cat in catalogs: + if cat.get("name"): + try: + cat["name"] = await translation_service.translate(cat["name"], language) + except Exception as e: + logger.warning(f"Failed to translate catalog name '{cat.get('name')}': {e}") + translated_catalogs.append(cat) + + return translated_catalogs + + def _sort_catalogs( + self, catalogs: list[dict[str, Any]], user_settings: UserSettings | None + ) -> list[dict[str, Any]]: + """Sort catalogs according to user settings order.""" + if not user_settings: + return catalogs + + order_map = {c.id: i for i, c in enumerate(user_settings.catalogs)} + sorted_catalogs = sorted(catalogs, key=lambda x: order_map.get(get_config_id(x), 999)) + return sorted_catalogs + + async def get_manifest_for_token(self, token: str) -> dict[str, Any]: + """ + Generate manifest for a given token. + + Args: + token: User token + + Returns: + Complete manifest dictionary + + Raises: + HTTPException: If token is invalid or credentials are missing + """ + if not token: + raise HTTPException(status_code=401, detail="Missing token. Please reconfigure the addon.") + + # Load user credentials and settings + creds = await token_store.get_user_data(token) + if not creds: + raise HTTPException(status_code=401, detail="Token not found. Please reconfigure the addon.") + + user_settings = None + try: + if creds.get("settings"): + user_settings = UserSettings(**creds["settings"]) + except Exception as e: + logger.error(f"[{redact_token(token)}] Error loading user data from token store: {e}") + raise HTTPException(status_code=401, detail="Invalid token session. Please reconfigure.") + + base_manifest = self.get_base_manifest() + + bundle = StremioBundle() + fetched_catalogs = [] + try: + # Resolve auth key + auth_key = await self._resolve_auth_key(bundle, creds, token) + + if auth_key: + fetched_catalogs = await self._build_dynamic_catalogs(bundle, auth_key, user_settings, token) + except Exception as e: + logger.exception(f"[{redact_token(token)}] Dynamic catalog build failed: {e}") + fetched_catalogs = [] + finally: + await bundle.close() + + # Combine base catalogs with fetched catalogs + all_catalogs = [c.copy() for c in base_manifest["catalogs"]] + [c.copy() for c in fetched_catalogs] + + # Translate catalogs + language = user_settings.language if user_settings else None + translated_catalogs = await self._translate_catalogs(all_catalogs, language) + + # Sort catalogs + sorted_catalogs = self._sort_catalogs(translated_catalogs, user_settings) + + if sorted_catalogs: + base_manifest["catalogs"] = sorted_catalogs + + return base_manifest + + +manifest_service = ManifestService() diff --git a/app/services/recommendation/catalog_service.py b/app/services/recommendation/catalog_service.py index 303f42a..612ddf6 100644 --- a/app/services/recommendation/catalog_service.py +++ b/app/services/recommendation/catalog_service.py @@ -6,6 +6,7 @@ from app.core.config import settings from app.core.constants import DEFAULT_CATALOG_LIMIT, DEFAULT_MIN_ITEMS +from app.core.security import redact_token from app.core.settings import UserSettings, get_default_settings from app.models.taste_profile import TasteProfile from app.services.catalog_updater import catalog_updater @@ -19,9 +20,35 @@ from app.services.stremio.service import StremioBundle from app.services.tmdb.service import get_tmdb_service from app.services.token_store import token_store - -PAD_RECOMMENDATIONS_THRESHOLD = 8 -PAD_RECOMMENDATIONS_TARGET = 10 +from app.services.user_cache import user_cache +from app.utils.catalog import cache_profile_and_watched_sets + + +def _clean_meta(meta: dict) -> dict: + """Return a sanitized Stremio meta object without internal fields. + + Keeps only public keys and drops internal scoring/IDs/keywords/cast, etc. + """ + allowed = { + "id", + "type", + "name", + "poster", + "background", + "description", + "releaseInfo", + "imdbRating", + "genres", + "runtime", + } + cleaned = {k: v for k, v in meta.items() if k in allowed} + # Drop empty values + cleaned = {k: v for k, v in cleaned.items() if v not in (None, "", [], {}, ())} + + # if id does not start with tt, return None + if not cleaned.get("id", "").startswith("tt"): + return None + return cleaned class CatalogService: @@ -45,7 +72,10 @@ async def get_catalog( # Validate inputs self._validate_inputs(token, content_type, catalog_id) - logger.info(f"[{token[:8]}...] Fetching catalog for {content_type} with id {catalog_id}") + # Prepare response headers + headers: dict[str, Any] = {"Cache-Control": f"public, max-age={settings.CATALOG_CACHE_TTL}"} + + logger.info(f"[{redact_token(token)}...] Fetching catalog for {content_type} with id {catalog_id}") # Get credentials credentials = await token_store.get_user_data(token) @@ -55,14 +85,24 @@ async def get_catalog( # Trigger lazy update if needed if settings.AUTO_UPDATE_CATALOGS: - logger.info(f"[{token[:8]}...] Triggering auto update for token") + logger.info(f"[{redact_token(token)}...] Triggering auto update for token") try: await catalog_updater.trigger_update(token, credentials) except Exception as e: - logger.error(f"[{token[:8]}...] Failed to trigger auto update: {e}") + logger.error(f"[{redact_token(token)}...] Failed to trigger auto update: {e}") # continue with the request even if the auto update fails pass + # get cached catalog + cached_data = await user_cache.get_catalog(token, content_type, catalog_id) + if cached_data: + logger.debug(f"[{redact_token(token)}...] Using cached catalog for {content_type}/{catalog_id}") + return cached_data, headers + + logger.info( + f"[{redact_token(token)}...] Catalog not cached for {content_type}/{catalog_id}, building from" " scratch" + ) + bundle = StremioBundle() try: # Resolve auth and settings @@ -70,18 +110,35 @@ async def get_catalog( user_settings = self._extract_settings(credentials) language = user_settings.language if user_settings else "en-US" - # Fetch library - library_items = await bundle.library.get_library_items(auth_key) + # Try to get cached library items first + library_items = await user_cache.get_library_items(token) - # Initialize services - services = self._initialize_services(language, user_settings) + if library_items: + logger.debug(f"[{redact_token(token)}...] Using cached library items") + else: + # Fetch library if not cached + logger.info(f"[{redact_token(token)}...] Library items not cached, fetching from Stremio") + library_items = await bundle.library.get_library_items(auth_key) + # Cache it for future use + await user_cache.set_library_items(token, library_items) + services = self._initialize_services(language, user_settings) integration_service: ProfileIntegration = services["integration"] - # Build profile and watched sets (once, reused) - profile, watched_tmdb, watched_imdb = await integration_service.build_profile_from_library( - library_items, content_type, bundle, auth_key - ) + # Try to get cached profile and watched sets + cached_data = await user_cache.get_profile_and_watched_sets(token, content_type) + + if cached_data: + # Use cached profile and watched sets + profile, watched_tmdb, watched_imdb = cached_data + logger.debug(f"[{redact_token(token)}...] Using cached profile and watched sets for {content_type}") + else: + # Build profile if not cached + logger.info(f"[{redact_token(token)}...] Profile not cached for {content_type}, building from library") + profile, watched_tmdb, watched_imdb = await cache_profile_and_watched_sets( + token, content_type, integration_service, library_items, bundle, auth_key + ) + whitelist = await integration_service.get_genre_whitelist(profile, content_type) if profile else set() # Route to appropriate recommendation service @@ -112,10 +169,16 @@ async def get_catalog( logger.info(f"Returning {len(recommendations)} items for {content_type}") - # Prepare response headers - headers = {"Cache-Control": f"public, max-age={settings.CATALOG_CACHE_TTL}"} + # Clean and format metadata + cleaned = [_clean_meta(m) for m in recommendations] + cleaned = [m for m in cleaned if m is not None] + + data = {"metas": cleaned} + # if catalog data is not empty, set the cache + if cleaned: + await user_cache.set_catalog(token, content_type, catalog_id, data, settings.CATALOG_CACHE_TTL) - return recommendations, headers + return data, headers finally: await bundle.close() diff --git a/app/services/redis_service.py b/app/services/redis_service.py index c51d25c..a16c37f 100644 --- a/app/services/redis_service.py +++ b/app/services/redis_service.py @@ -101,6 +101,31 @@ async def exists(self, key: str) -> bool: logger.error(f"Failed to check existence of key '{key}' in Redis: {exc}") return False + async def delete_by_pattern(self, pattern: str) -> int: + """Delete all keys matching a pattern. + + Args: + pattern: Redis key pattern (e.g., "watchly:catalog:token123:*") + + Returns: + Number of keys deleted + """ + try: + client = await self.get_client() + deleted_count = 0 + keys_to_delete = [] + async for key in client.scan_iter(match=pattern, count=500): + keys_to_delete.append(key) + if len(keys_to_delete) >= 500: + deleted_count += await client.delete(*keys_to_delete) + keys_to_delete = [] + if keys_to_delete: + deleted_count += await client.delete(*keys_to_delete) + return deleted_count + except (redis.RedisError, OSError) as exc: + logger.error(f"Failed to delete keys matching pattern '{pattern}' in Redis: {exc}") + return 0 + async def close(self) -> None: """Close and disconnect the Redis client""" if self._client is not None: diff --git a/app/services/stremio/library.py b/app/services/stremio/library.py index 3e3d986..abae41f 100644 --- a/app/services/stremio/library.py +++ b/app/services/stremio/library.py @@ -113,7 +113,9 @@ async def get_library_items(self, auth_key: str) -> dict[str, list[dict[str, Any # added by stremio itself on user watch added.append(item) elif item.get("removed"): - removed.append(item) + # do not do anything with removed items + # removed.append(item) + continue # 4. Sort watched items by recency def sort_by_recency(x: dict): diff --git a/app/services/token_store.py b/app/services/token_store.py index ed545a9..f716f57 100644 --- a/app/services/token_store.py +++ b/app/services/token_store.py @@ -13,6 +13,7 @@ from app.core.config import settings from app.core.security import redact_token from app.services.redis_service import redis_service +from app.services.user_cache import user_cache class TokenStore: @@ -174,6 +175,12 @@ async def delete_token(self, token: str = None, key: str = None) -> None: key = self._format_key(token) await redis_service.delete(key) + # we also need to delete the cached library items, profiles and watched sets + if token: + try: + await user_cache.invalidate_all_user_data(token) + except Exception as e: + logger.warning(f"Failed to invalidate all user data for {redact_token(token)}: {e}") # Invalidate async LRU cache so future reads reflect deletion try: diff --git a/app/services/user_cache.py b/app/services/user_cache.py new file mode 100644 index 0000000..017245d --- /dev/null +++ b/app/services/user_cache.py @@ -0,0 +1,319 @@ +import json +from typing import Any + +from loguru import logger + +from app.core.constants import CATALOG_KEY, LIBRARY_ITEMS_KEY, PROFILE_KEY, WATCHED_SETS_KEY +from app.core.security import redact_token +from app.models.taste_profile import TasteProfile +from app.services.redis_service import redis_service + + +class UserCacheService: + @staticmethod + def _library_items_key(token: str) -> str: + """Generate cache key for library items.""" + return LIBRARY_ITEMS_KEY.format(token=token) + + @staticmethod + def _profile_key(token: str, content_type: str) -> str: + """Generate cache key for profile.""" + return PROFILE_KEY.format(token=token, content_type=content_type) + + @staticmethod + def _watched_sets_key(token: str, content_type: str) -> str: + """Generate cache key for watched sets.""" + return WATCHED_SETS_KEY.format(token=token, content_type=content_type) + + # Library Items Methods + + async def get_library_items(self, token: str) -> dict[str, Any] | None: + """ + Get cached library items for a user. + + Args: + token: User token + + Returns: + Library items dictionary, or None if not cached + """ + key = self._library_items_key(token) + cached = await redis_service.get(key) + + if cached: + try: + return json.loads(cached) + except json.JSONDecodeError as e: + logger.warning(f"Failed to decode cached library items for {redact_token(token)}...: {e}") + return None + + return None + + async def set_library_items(self, token: str, library_items: dict[str, Any]) -> None: + """ + Cache library items for a user. + + Args: + token: User token + library_items: Library items dictionary to cache + """ + key = self._library_items_key(token) + await redis_service.set(key, json.dumps(library_items)) + logger.debug(f"[{redact_token(token)}...] Cached library items") + + # Invalidate all catalog caches when library items are updated + # This ensures catalogs are regenerated with fresh library data + await self.invalidate_all_catalogs(token) + + async def invalidate_library_items(self, token: str) -> None: + """ + Invalidate cached library items for a user. + + Args: + token: User token + """ + key = self._library_items_key(token) + await redis_service.delete(key) + logger.debug(f"[{redact_token(token)}...] Invalidated library items cache") + + # Profile Methods + + async def get_profile(self, token: str, content_type: str) -> TasteProfile | None: + """ + Get cached profile for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + + Returns: + TasteProfile instance, or None if not cached + """ + key = self._profile_key(token, content_type) + cached = await redis_service.get(key) + + if cached: + try: + return TasteProfile.model_validate_json(cached) + except (json.JSONDecodeError, ValueError) as e: + logger.warning(f"Failed to decode cached profile for {redact_token(token)}.../{content_type}: {e}") + return None + + return None + + async def set_profile(self, token: str, content_type: str, profile: TasteProfile) -> None: + """ + Cache profile for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + profile: TasteProfile instance to cache + """ + key = self._profile_key(token, content_type) + await redis_service.set(key, profile.model_dump_json()) + logger.debug(f"[{redact_token(token)}...] Cached profile for {content_type}") + + async def invalidate_profile(self, token: str, content_type: str) -> None: + """ + Invalidate cached profile for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + """ + key = self._profile_key(token, content_type) + await redis_service.delete(key) + logger.debug(f"[{redact_token(token)}...] Invalidated profile cache for {content_type}") + + # Watched Sets Methods + + async def get_watched_sets(self, token: str, content_type: str) -> tuple[set[int], set[str]] | None: + """ + Get cached watched sets for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + + Returns: + Tuple of (watched_tmdb set, watched_imdb set), or None if not cached + """ + key = self._watched_sets_key(token, content_type) + cached = await redis_service.get(key) + + if cached: + try: + data = json.loads(cached) + watched_tmdb = set(data.get("watched_tmdb", [])) + watched_imdb = set(data.get("watched_imdb", [])) + return (watched_tmdb, watched_imdb) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning(f"Failed to decode cached watched sets for {redact_token(token)}.../{content_type}: {e}") + return None + + return None + + async def set_watched_sets( + self, token: str, content_type: str, watched_tmdb: set[int], watched_imdb: set[str] + ) -> None: + """ + Cache watched sets for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + watched_tmdb: Set of watched TMDB IDs + watched_imdb: Set of watched IMDb IDs + """ + key = self._watched_sets_key(token, content_type) + data = { + "watched_tmdb": list(watched_tmdb), + "watched_imdb": list(watched_imdb), + } + await redis_service.set(key, json.dumps(data)) + logger.debug(f"[{redact_token(token)}...] Cached watched sets for {content_type}") + + async def invalidate_watched_sets(self, token: str, content_type: str) -> None: + """ + Invalidate cached watched sets for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + """ + key = self._watched_sets_key(token, content_type) + await redis_service.delete(key) + logger.debug(f"[{redact_token(token)}...] Invalidated watched sets cache for {content_type}") + + # Combined Methods + + async def get_profile_and_watched_sets( + self, token: str, content_type: str + ) -> tuple[TasteProfile | None, set[int], set[str]] | None: + """ + Get both cached profile and watched sets for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + + Returns: + Tuple of (profile, watched_tmdb, watched_imdb), or None if either is not cached. + Returns None if either profile or watched sets are missing. + """ + profile = await self.get_profile(token, content_type) + watched_sets = await self.get_watched_sets(token, content_type) + + if profile is None or watched_sets is None: + return None + + watched_tmdb, watched_imdb = watched_sets + return (profile, watched_tmdb, watched_imdb) + + async def set_profile_and_watched_sets( + self, + token: str, + content_type: str, + profile: TasteProfile | None, + watched_tmdb: set[int], + watched_imdb: set[str], + ) -> None: + """ + Cache both profile and watched sets for a user and content type. + + Args: + token: User token + content_type: Content type (movie or series) + profile: TasteProfile instance to cache (can be None) + watched_tmdb: Set of watched TMDB IDs + watched_imdb: Set of watched IMDb IDs + """ + if profile: + await self.set_profile(token, content_type, profile) + await self.set_watched_sets(token, content_type, watched_tmdb, watched_imdb) + + # Invalidate all catalog caches when profile is updated + # This ensures catalogs are regenerated with fresh profile data + await self.invalidate_all_catalogs(token) + + # Invalidation Methods + + async def invalidate_all_user_data(self, token: str) -> None: + """ + Invalidate all cached data for a user (library items, profiles, watched sets, catalogs). + + Args: + token: User token + """ + await self.invalidate_library_items(token) + for content_type in ["movie", "series"]: + await self.invalidate_profile(token, content_type) + await self.invalidate_watched_sets(token, content_type) + await self.invalidate_all_catalogs(token) + logger.debug(f"[{redact_token(token)}...] Invalidated all user data cache") + + async def get_catalog(self, token: str, type: str, id: str) -> dict[str, Any] | None: + """ + Get cached catalog for a user and content type. + + Args: + token: User token + type: Content type (movie or series) + id: Catalog ID + """ + key = CATALOG_KEY.format(token=token, type=type, id=id) + cached = await redis_service.get(key) + if cached: + return json.loads(cached) + return None + + async def set_catalog( + self, token: str, type: str, id: str, catalog: dict[str, Any], ttl: int | None = None + ) -> None: + """ + Cache catalog for a user and content type. + + Args: + token: User token + type: Content type (movie or series) + id: Catalog ID + catalog: Catalog dictionary to cache + ttl: Time to live for the cache (in seconds) + """ + key = CATALOG_KEY.format(token=token, type=type, id=id) + await redis_service.set(key, json.dumps(catalog), ttl) + logger.debug(f"[{redact_token(token)}...] Cached catalog for {type}/{id}") + + async def invalidate_catalog(self, token: str, type: str, id: str) -> None: + """ + Invalidate cached catalog for a user and content type. + + Args: + token: User token + type: Content type (movie or series) + id: Catalog ID + """ + key = CATALOG_KEY.format(token=token, type=type, id=id) + await redis_service.delete(key) + logger.debug(f"[{redact_token(token)}...] Invalidated catalog cache for {type}/{id}") + + async def invalidate_all_catalogs(self, token: str) -> None: + """ + Invalidate all cached catalogs for a user. + + This should be called when user data (library items, profiles) is updated + to ensure catalogs are regenerated with fresh data. + + Args: + token: User token + """ + pattern = f"watchly:catalog:{token}:*" + deleted_count = await redis_service.delete_by_pattern(pattern) + if deleted_count > 0: + logger.debug(f"[{redact_token(token)}...] Invalidated {deleted_count} catalog cache(s)") + else: + logger.debug(f"[{redact_token(token)}...] No catalog caches found to invalidate") + + +user_cache = UserCacheService() diff --git a/app/utils/catalog.py b/app/utils/catalog.py index 16ec0c5..a638b99 100644 --- a/app/utils/catalog.py +++ b/app/utils/catalog.py @@ -1,4 +1,7 @@ from app.core.settings import UserSettings +from app.services.profile.integration import ProfileIntegration +from app.services.stremio.service import StremioBundle +from app.services.user_cache import user_cache def get_catalogs_from_config( @@ -16,3 +19,34 @@ def get_catalogs_from_config( if enabled_series: catalogs.append({"type": "series", "id": cat_id, "name": name, "extra": []}) return catalogs + + +async def cache_profile_and_watched_sets( + token: str, + content_type: str, + integration_service: ProfileIntegration, + library_items: dict, + bundle: StremioBundle, + auth_key: str, +): + """ + Build and cache profile and watched sets for a user and content type. + Uses the centralized UserCacheService for caching. + """ + profile, watched_tmdb, watched_imdb = await integration_service.build_profile_from_library( + library_items, content_type, bundle, auth_key + ) + + await user_cache.set_profile_and_watched_sets(token, content_type, profile, watched_tmdb, watched_imdb) + return profile, watched_tmdb, watched_imdb + + +def get_config_id(catalog) -> str | None: + catalog_id = catalog.get("id", "") + if catalog_id.startswith("watchly.theme."): + return "watchly.theme" + if catalog_id.startswith("watchly.loved."): + return "watchly.loved" + if catalog_id.startswith("watchly.watched."): + return "watchly.watched" + return catalog_id