Skip to content

Commit ce9b681

Browse files
feat: implement Redis caching for catalog retrieval and add Redis service integration
- Introduced RedisService for managing Redis operations. - Updated catalog retrieval to cache results in Redis for improved performance. - Added CATALOG_CACHE_TTL setting for cache expiration. - Refactored CatalogService and TokenStore to utilize RedisService for data storage and retrieval.
1 parent a96dcee commit ce9b681

File tree

5 files changed

+144
-74
lines changed

5 files changed

+144
-74
lines changed

app/api/endpoints/catalogs.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1+
import json
2+
13
from fastapi import APIRouter, HTTPException, Response
24
from loguru import logger
35

6+
from app.core.config import settings
47
from app.core.security import redact_token
5-
from app.services.recommendation.catalog_service import CatalogService
8+
from app.services.recommendation.catalog_service import catalog_service
9+
from app.services.redis_service import redis_service
610

711
router = APIRouter()
812

9-
# Initialize catalog service (singleton)
10-
_catalog_service = CatalogService()
11-
1213

1314
def _clean_meta(meta: dict) -> dict:
1415
"""Return a sanitized Stremio meta object without internal fields.
@@ -45,8 +46,14 @@ async def get_catalog(type: str, id: str, response: Response, token: str):
4546
This endpoint delegates all logic to CatalogService facade.
4647
"""
4748
try:
49+
# catalog_key
50+
catalog_key = f"watchly:catalog:{token}:{type}:{id}"
51+
cached_data = await redis_service.get(catalog_key)
52+
if cached_data:
53+
return json.loads(cached_data)
54+
4855
# Delegate to catalog service facade
49-
recommendations, headers = await _catalog_service.get_catalog(token, type, id)
56+
recommendations, headers = await catalog_service.get_catalog(token, type, id)
5057

5158
# Set response headers
5259
for key, value in headers.items():
@@ -56,7 +63,9 @@ async def get_catalog(type: str, id: str, response: Response, token: str):
5663
cleaned = [_clean_meta(m) for m in recommendations]
5764
cleaned = [m for m in cleaned if m is not None]
5865

59-
return {"metas": cleaned}
66+
data = {"metas": cleaned}
67+
await redis_service.set(catalog_key, json.dumps(data), settings.CATALOG_CACHE_TTL)
68+
return data
6069

6170
except HTTPException:
6271
raise

app/core/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class Settings(BaseSettings):
3838
RECOMMENDATION_SOURCE_ITEMS_LIMIT: int = 10
3939
LIBRARY_ITEMS_LIMIT: int = 20
4040

41+
CATALOG_CACHE_TTL: int = 12 * 60 * 60 # 12 hours
42+
4143
# AI
4244
DEFAULT_GEMINI_MODEL: str = "gemma-3-27b-it"
4345
GEMINI_API_KEY: str | None = None

app/services/recommendation/catalog_service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,3 +329,6 @@ async def _get_recommendations(
329329
recommendations = []
330330

331331
return recommendations
332+
333+
334+
catalog_service = CatalogService()

app/services/redis_service.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
from typing import Any
2+
3+
import redis.asyncio as redis
4+
from loguru import logger
5+
6+
from app.core.config import settings
7+
8+
9+
class RedisService:
10+
def __init__(self) -> None:
11+
self._client: redis.Redis | None = None
12+
if not settings.REDIS_URL:
13+
logger.warning("REDIS_URL is not set. Redis operations will fail until configured.")
14+
15+
async def get_client(self) -> redis.Redis:
16+
if self._client is None:
17+
logger.info("Creating Redis client for RedisService")
18+
self._client = redis.from_url(
19+
settings.REDIS_URL,
20+
decode_responses=True,
21+
encoding="utf-8",
22+
socket_connect_timeout=5,
23+
socket_timeout=5,
24+
max_connections=getattr(settings, "REDIS_MAX_CONNECTIONS", 100),
25+
health_check_interval=30,
26+
socket_keepalive=True,
27+
)
28+
return self._client
29+
30+
async def set(self, key: str, value: Any, ttl: int | None = None) -> bool:
31+
"""Store a value in Redis with optional TTL.
32+
33+
Args:
34+
key: The key to store the value under
35+
value: The value to store (will be converted to string)
36+
ttl: Optional time-to-live in seconds. If None, key never expires.
37+
38+
Returns:
39+
True if successful, False otherwise
40+
"""
41+
try:
42+
client = await self.get_client()
43+
str_value = str(value)
44+
if ttl is not None:
45+
result = await client.setex(key, ttl, str_value)
46+
else:
47+
result = await client.set(key, str_value)
48+
return bool(result)
49+
except (redis.RedisError, OSError) as exc:
50+
logger.error(f"Failed to set key '{key}' in Redis: {exc}")
51+
return False
52+
53+
async def get(self, key: str) -> str | None:
54+
"""Get a value from Redis by key.
55+
56+
Args:
57+
key: The key to retrieve
58+
59+
Returns:
60+
The value as a string, or None if key doesn't exist or error occurred
61+
"""
62+
try:
63+
client = await self.get_client()
64+
value = await client.get(key)
65+
return value
66+
except (redis.RedisError, OSError) as exc:
67+
logger.error(f"Failed to get key '{key}' from Redis: {exc}")
68+
return None
69+
70+
async def delete(self, key: str) -> bool:
71+
"""Delete a key from Redis.
72+
73+
Args:
74+
key: The key to delete
75+
76+
Returns:
77+
True if key was deleted, False otherwise
78+
"""
79+
try:
80+
client = await self.get_client()
81+
result = await client.delete(key)
82+
return bool(result)
83+
except (redis.RedisError, OSError) as exc:
84+
logger.error(f"Failed to delete key '{key}' from Redis: {exc}")
85+
return False
86+
87+
async def exists(self, key: str) -> bool:
88+
"""Check if a key exists in Redis.
89+
90+
Args:
91+
key: The key to check
92+
93+
Returns:
94+
True if key exists, False otherwise
95+
"""
96+
try:
97+
client = await self.get_client()
98+
result = await client.exists(key)
99+
return bool(result)
100+
except (redis.RedisError, OSError) as exc:
101+
logger.error(f"Failed to check existence of key '{key}' in Redis: {exc}")
102+
return False
103+
104+
async def close(self) -> None:
105+
"""Close and disconnect the Redis client"""
106+
if self._client is not None:
107+
try:
108+
await self._client.close()
109+
logger.info("RedisService client closed")
110+
except Exception as exc:
111+
logger.warning(f"Failed to close RedisService client: {exc}")
112+
finally:
113+
self._client = None
114+
115+
116+
redis_service = RedisService()

app/services/token_store.py

Lines changed: 8 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from app.core.config import settings
1414
from app.core.security import redact_token
15+
from app.services.redis_service import redis_service
1516

1617

1718
class TokenStore:
@@ -20,12 +21,9 @@ class TokenStore:
2021
KEY_PREFIX = settings.REDIS_TOKEN_KEY
2122

2223
def __init__(self) -> None:
23-
self._client: redis.Redis | None = None
2424
# Negative cache for missing tokens to avoid repeated Redis GETs
2525
# when external probes request non-existent tokens.
2626
self._missing_tokens: TTLCache = TTLCache(maxsize=10000, ttl=86400)
27-
if not settings.REDIS_URL:
28-
logger.warning("REDIS_URL is not set. Token storage will fail until a Redis instance is configured.")
2927

3028
if not settings.TOKEN_SALT or settings.TOKEN_SALT == "change-me":
3129
logger.warning(
@@ -34,10 +32,8 @@ def __init__(self) -> None:
3432

3533
def _ensure_secure_salt(self) -> None:
3634
if not settings.TOKEN_SALT or settings.TOKEN_SALT == "change-me":
37-
logger.error("Refusing to store credentials because TOKEN_SALT is unset or using the insecure default.")
38-
raise RuntimeError(
39-
"Server misconfiguration: TOKEN_SALT must be set to a non-default value before storing" " credentials."
40-
)
35+
logger.error("TOKEN_SALT is unset or using the insecure default.")
36+
raise RuntimeError("TOKEN_SALT must be set to a non-default value before storing credentials.")
4137

4238
def _get_cipher(self) -> Fernet:
4339
salt = b"x7FDf9kypzQ1LmR32b8hWv49sKq2Pd8T"
@@ -59,59 +55,6 @@ def decrypt_token(self, enc: str) -> str:
5955
cipher = self._get_cipher()
6056
return cipher.decrypt(enc.encode("utf-8")).decode("utf-8")
6157

62-
async def _get_client(self) -> redis.Redis:
63-
if self._client is None:
64-
# Add socket timeouts to avoid hanging on Redis operations
65-
import traceback
66-
67-
logger.info("Creating shared Redis client")
68-
# Limit the number of pooled connections to avoid unbounded growth
69-
# `max_connections` is forwarded to ConnectionPool.from_url
70-
self._client = redis.from_url(
71-
settings.REDIS_URL,
72-
decode_responses=True,
73-
encoding="utf-8",
74-
socket_connect_timeout=5,
75-
socket_timeout=5,
76-
max_connections=getattr(settings, "REDIS_MAX_CONNECTIONS", 100),
77-
health_check_interval=30,
78-
socket_keepalive=True,
79-
)
80-
if getattr(self, "_creation_count", None) is None:
81-
self._creation_count = 1
82-
else:
83-
self._creation_count += 1
84-
logger.warning(
85-
f"Redis client creation invoked again (count={self._creation_count})."
86-
f" Stack:\n{''.join(traceback.format_stack())}"
87-
)
88-
return self._client
89-
90-
async def close(self) -> None:
91-
"""Close and disconnect the shared Redis client (call on shutdown)."""
92-
if self._client is None:
93-
return
94-
try:
95-
logger.info("Closing shared Redis client")
96-
# Close client and disconnect underlying pool
97-
try:
98-
await self._client.close()
99-
except Exception as e:
100-
logger.debug(f"Silent failure closing redis client: {e}")
101-
try:
102-
pool = getattr(self._client, "connection_pool", None)
103-
if pool is not None:
104-
# connection_pool.disconnect may be a coroutine in some redis implementations
105-
disconnect = getattr(pool, "disconnect", None)
106-
if disconnect:
107-
res = disconnect()
108-
if hasattr(res, "__await__"):
109-
await res
110-
except Exception as e:
111-
logger.debug(f"Silent failure disconnecting redis pool: {e}")
112-
finally:
113-
self._client = None
114-
11558
def _format_key(self, token: str) -> str:
11659
"""Format Redis key from token."""
11760
return f"{self.KEY_PREFIX}{token}"
@@ -145,13 +88,12 @@ async def store_user_data(self, user_id: str, payload: dict[str, Any]) -> str:
14588
# Do not store plaintext passwords
14689
raise RuntimeError("PASSWORD_ENCRYPT_FAILED")
14790

148-
client = await self._get_client()
14991
json_str = json.dumps(storage_data)
15092

15193
if settings.TOKEN_TTL_SECONDS and settings.TOKEN_TTL_SECONDS > 0:
152-
await client.setex(key, settings.TOKEN_TTL_SECONDS, json_str)
94+
await redis_service.set(key, json_str, settings.TOKEN_TTL_SECONDS)
15395
else:
154-
await client.set(key, json_str)
96+
await redis_service.set(key, json_str)
15597

15698
# Invalidate async LRU cache for fresh reads on subsequent requests
15799
try:
@@ -193,8 +135,7 @@ async def get_user_data(self, token: str) -> dict[str, Any] | None:
193135

194136
logger.debug(f"[REDIS] Cache miss. Fetching data from redis for {token}")
195137
key = self._format_key(token)
196-
client = await self._get_client()
197-
data_raw = await client.get(key)
138+
data_raw = await redis_service.get(key)
198139

199140
if not data_raw:
200141
# remember negative result briefly
@@ -232,8 +173,7 @@ async def delete_token(self, token: str = None, key: str = None) -> None:
232173
if token:
233174
key = self._format_key(token)
234175

235-
client = await self._get_client()
236-
await client.delete(key)
176+
await redis_service.delete(key)
237177

238178
# Invalidate async LRU cache so future reads reflect deletion
239179
try:
@@ -261,7 +201,7 @@ async def count_users(self) -> int:
261201
Cached for 12 hours to avoid frequent Redis scans.
262202
"""
263203
try:
264-
client = await self._get_client()
204+
client = await redis_service.get_client()
265205
except (redis.RedisError, OSError) as exc:
266206
logger.warning(f"Cannot count users; Redis unavailable: {exc}")
267207
return 0

0 commit comments

Comments
 (0)