Skip to content

Commit 0e44a18

Browse files
Refactor services for improved async handling and error resilience (#30)
* refactor: fetch loved and liked items at once and use that * Refactor services for improved async handling and error resilience
1 parent 5e7bf53 commit 0e44a18

21 files changed

+607
-197
lines changed

app/api/endpoints/announcement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from fastapi import APIRouter
22

3-
from app.config import settings
3+
from app.core.config import settings
44

55
router = APIRouter(prefix="/announcement", tags=["announcement"])
66

app/api/endpoints/catalogs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from app.services.token_store import token_store
1212

1313
MAX_RESULTS = 50
14-
SOURCE_ITEMS_LIMIT = 10
14+
SOURCE_ITEMS_LIMIT = 15
1515

1616
router = APIRouter()
1717

app/api/endpoints/manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ async def _manifest_handler(response: Response, token: str):
137137

138138
@router.get("/manifest.json")
139139
async def manifest():
140-
manifest = await get_base_manifest()
140+
manifest = get_base_manifest()
141141
# since user is not logged in, return empty catalogs
142142
manifest["catalogs"] = []
143143
return manifest

app/api/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from fastapi import APIRouter
22

3+
from .endpoints.announcement import router as announcement_router
34
from .endpoints.catalogs import router as catalogs_router
45
from .endpoints.health import router as health_router
56
from .endpoints.manifest import router as manifest_router
@@ -19,3 +20,4 @@ async def root():
1920
api_router.include_router(tokens_router)
2021
api_router.include_router(health_router)
2122
api_router.include_router(meta_router)
23+
api_router.include_router(announcement_router)

app/core/app.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ async def lifespan(app: FastAPI):
3838
Manage application lifespan events (startup/shutdown).
3939
"""
4040
global catalog_updater
41-
asyncio.create_task(migrate_tokens())
41+
task = asyncio.create_task(migrate_tokens())
42+
43+
# Ensure background exceptions are surfaced in logs
44+
def _on_done(t: asyncio.Task):
45+
try:
46+
t.result()
47+
except Exception as exc:
48+
logger.error(f"migrate_tokens background task failed: {exc}")
49+
50+
task.add_done_callback(_on_done)
4251

4352
# Startup
4453
if settings.AUTO_UPDATE_CATALOGS:

app/services/catalog.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from datetime import datetime, timezone
2+
13
from app.core.settings import CatalogConfig, UserSettings
24
from app.services.row_generator import RowGeneratorService
35
from app.services.scoring import ScoringService
@@ -130,30 +132,35 @@ async def get_dynamic_catalogs(
130132
return catalogs
131133

132134
async def _add_item_based_rows(
133-
self, catalogs: list, library_items: dict, content_type: str, language: str, loved_config, watched_config
135+
self,
136+
catalogs: list,
137+
library_items: dict,
138+
content_type: str,
139+
language: str,
140+
loved_config,
141+
watched_config,
134142
):
135143
"""Helper to add 'Because you watched' and 'More like' rows."""
136144

137145
# Helper to parse date
138146
def get_date(item):
139-
import datetime
140147

141148
val = item.get("state", {}).get("lastWatched")
142149
if val:
143150
try:
144151
if isinstance(val, str):
145-
return datetime.datetime.fromisoformat(val.replace("Z", "+00:00"))
152+
return datetime.fromisoformat(val.replace("Z", "+00:00"))
146153
return val
147154
except (ValueError, TypeError):
148155
pass
149156
# Fallback to mtime
150157
val = item.get("_mtime")
151158
if val:
152159
try:
153-
return datetime.datetime.fromisoformat(str(val).replace("Z", "+00:00"))
160+
return datetime.fromisoformat(str(val).replace("Z", "+00:00"))
154161
except (ValueError, TypeError):
155162
pass
156-
return datetime.datetime.min.replace(tzinfo=datetime.UTC)
163+
return datetime.min.replace(tzinfo=timezone.utc)
157164

158165
# 1. More Like <Loved Item>
159166
last_loved = None # Initialize for the watched check

app/services/catalog_updater.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ async def refresh_catalogs_for_credentials(token: str, credentials: dict[str, An
3131
addon_installed = await stremio_service.is_addon_installed(auth_key)
3232
if not addon_installed:
3333
logger.info(f"[{redact_token(token)}] User has not installed addon. Removing token from redis")
34-
await token_store.delete_token(key=token)
34+
# Ensure we delete by token, not by raw Redis key
35+
await token_store.delete_token(token=token)
3536
return True
3637
except Exception as e:
3738
logger.exception(f"[{redact_token(token)}] Failed to check if addon is installed: {e}")
@@ -41,6 +42,7 @@ async def refresh_catalogs_for_credentials(token: str, credentials: dict[str, An
4142
dynamic_catalog_service = DynamicCatalogService(stremio_service=stremio_service)
4243

4344
# Ensure user_settings is available
45+
user_settings = get_default_settings()
4446
if credentials.get("settings"):
4547
try:
4648
user_settings = UserSettings(**credentials["settings"])
@@ -140,7 +142,10 @@ async def _update_safe(key: str, payload: dict[str, Any]) -> None:
140142

141143
try:
142144
async for key, payload in token_store.iter_payloads():
143-
tasks.append(asyncio.create_task(_update_safe(key, payload)))
145+
# Extract token from redis key prefix
146+
prefix = token_store.KEY_PREFIX
147+
tok = key[len(prefix) :] if key.startswith(prefix) else key # noqa
148+
tasks.append(asyncio.create_task(_update_safe(tok, payload)))
144149

145150
if tasks:
146151
logger.info(f"Starting background refresh for {len(tasks)} tokens...")

app/services/discovery.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class DiscoveryEngine:
1212

1313
def __init__(self):
1414
self.tmdb_service = TMDBService()
15+
# Limit concurrent discovery calls to avoid rate limiting
16+
self._sem = asyncio.Semaphore(10)
1517

1618
async def discover_recommendations(
1719
self,
@@ -63,7 +65,7 @@ async def discover_recommendations(
6365
for i in range(2):
6466
params_rating = {
6567
"with_genres": genre_ids,
66-
"sort_by": "ratings.desc",
68+
"sort_by": "vote_average.desc",
6769
"vote_count.gte": 500,
6870
"page": i + 1,
6971
**base_params,
@@ -85,7 +87,7 @@ async def discover_recommendations(
8587
for i in range(3):
8688
params_rating = {
8789
"with_keywords": keyword_ids,
88-
"sort_by": "ratings.desc",
90+
"sort_by": "vote_average.desc",
8991
"vote_count.gte": 500,
9092
"page": i + 1,
9193
**base_params,
@@ -105,7 +107,7 @@ async def discover_recommendations(
105107

106108
params_rating = {
107109
"with_cast": str(actor_id),
108-
"sort_by": "ratings.desc",
110+
"sort_by": "vote_average.desc",
109111
"vote_count.gte": 500,
110112
**base_params,
111113
}
@@ -124,7 +126,7 @@ async def discover_recommendations(
124126

125127
params_rating = {
126128
"with_crew": str(director_id),
127-
"sort_by": "ratings.desc",
129+
"sort_by": "vote_average.desc",
128130
"vote_count.gte": 500,
129131
**base_params,
130132
}
@@ -143,7 +145,7 @@ async def discover_recommendations(
143145

144146
params_rating = {
145147
"with_origin_country": country_ids,
146-
"sort_by": "ratings.desc",
148+
"sort_by": "vote_average.desc",
147149
"vote_count.gte": 300,
148150
**base_params,
149151
}
@@ -154,11 +156,11 @@ async def discover_recommendations(
154156
year = top_year[0][0]
155157
# we store year in 10 years bucket
156158
start_year = f"{year}-01-01"
157-
end_year = f"{int(year) + 10}-12-31"
159+
end_year = f"{int(year) + 9}-12-31"
158160
params_rating = {
159161
"primary_release_date.gte": start_year,
160162
"primary_release_date.lte": end_year,
161-
"sort_by": "ratings.desc",
163+
"sort_by": "vote_average.desc",
162164
"vote_count.gte": 500,
163165
**base_params,
164166
}
@@ -181,7 +183,8 @@ async def discover_recommendations(
181183
async def _fetch_discovery(self, media_type: str, params: dict) -> list[dict]:
182184
"""Helper to call TMDB discovery."""
183185
try:
184-
data = await self.tmdb_service.get_discover(media_type, **params)
185-
return data.get("results", [])
186+
async with self._sem:
187+
data = await self.tmdb_service.get_discover(media_type, **params)
188+
return data.get("results", [])
186189
except Exception:
187190
return []

app/services/gemini.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
from google import genai
24
from loguru import logger
35

@@ -52,5 +54,10 @@ def generate_content(self, prompt: str) -> str:
5254
logger.error(f"Error generating content: {e}")
5355
return ""
5456

57+
async def generate_content_async(self, prompt: str) -> str:
58+
"""Async wrapper to avoid blocking the event loop during network calls."""
59+
loop = asyncio.get_running_loop()
60+
return await loop.run_in_executor(None, lambda: self.generate_content(prompt))
61+
5562

5663
gemini_service = GeminiService()

0 commit comments

Comments
 (0)