| 
 | 1 | +"""Utilities for connecting to and managing Redis connections."""  | 
 | 2 | + | 
 | 3 | +from typing import Optional  | 
 | 4 | + | 
 | 5 | +from pydantic_settings import BaseSettings  | 
 | 6 | +from redis import asyncio as aioredis  | 
 | 7 | +from redis.asyncio.sentinel import Sentinel  | 
 | 8 | + | 
 | 9 | +redis_pool: Optional[aioredis.Redis] = None  | 
 | 10 | + | 
 | 11 | + | 
 | 12 | +class RedisSentinelSettings(BaseSettings):  | 
 | 13 | +    """Configuration for connecting to Redis Sentinel."""  | 
 | 14 | + | 
 | 15 | +    REDIS_SENTINEL_HOSTS: str = ""  | 
 | 16 | +    REDIS_SENTINEL_PORTS: str = "26379"  | 
 | 17 | +    REDIS_SENTINEL_MASTER_NAME: str = "master"  | 
 | 18 | +    REDIS_DB: int = 15  | 
 | 19 | + | 
 | 20 | +    REDIS_MAX_CONNECTIONS: int = 10  | 
 | 21 | +    REDIS_RETRY_TIMEOUT: bool = True  | 
 | 22 | +    REDIS_DECODE_RESPONSES: bool = True  | 
 | 23 | +    REDIS_CLIENT_NAME: str = "stac-fastapi-app"  | 
 | 24 | +    REDIS_HEALTH_CHECK_INTERVAL: int = 30  | 
 | 25 | + | 
 | 26 | + | 
 | 27 | +class RedisSettings(BaseSettings):  | 
 | 28 | +    """Configuration for connecting Redis Sentinel."""  | 
 | 29 | + | 
 | 30 | +    REDIS_HOST: str = ""  | 
 | 31 | +    REDIS_PORT: int = 6379  | 
 | 32 | +    REDIS_DB: int = 0  | 
 | 33 | + | 
 | 34 | +    REDIS_MAX_CONNECTIONS: int = 10  | 
 | 35 | +    REDIS_RETRY_TIMEOUT: bool = True  | 
 | 36 | +    REDIS_DECODE_RESPONSES: bool = True  | 
 | 37 | +    REDIS_CLIENT_NAME: str = "stac-fastapi-app"  | 
 | 38 | +    REDIS_HEALTH_CHECK_INTERVAL: int = 30  | 
 | 39 | + | 
 | 40 | + | 
 | 41 | +# Select the Redis or Redis Sentinel configuration  | 
 | 42 | +redis_settings: BaseSettings = RedisSettings()  | 
 | 43 | + | 
 | 44 | + | 
 | 45 | +async def connect_redis(settings: Optional[RedisSettings] = None) -> aioredis.Redis:  | 
 | 46 | +    """Return a Redis connection."""  | 
 | 47 | +    global redis_pool  | 
 | 48 | +    settings = settings or redis_settings  | 
 | 49 | + | 
 | 50 | +    if (  | 
 | 51 | +        not settings.REDIS_HOST  | 
 | 52 | +        or not settings.REDIS_PORT  | 
 | 53 | +    ):  | 
 | 54 | +        return None  | 
 | 55 | + | 
 | 56 | +    if redis_pool is None:  | 
 | 57 | +        pool = aioredis.ConnectionPool(  | 
 | 58 | +            host=settings.REDIS_HOST,  | 
 | 59 | +            port=settings.REDIS_PORT,  | 
 | 60 | +            db=settings.REDIS_DB,  | 
 | 61 | +            max_connections=settings.REDIS_MAX_CONNECTIONS,  | 
 | 62 | +            decode_responses=settings.REDIS_DECODE_RESPONSES,  | 
 | 63 | +            retry_on_timeout=settings.REDIS_RETRY_TIMEOUT,  | 
 | 64 | +            health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL,  | 
 | 65 | +        )  | 
 | 66 | +        redis_pool = aioredis.Redis(  | 
 | 67 | +            connection_pool=pool, client_name=settings.REDIS_CLIENT_NAME  | 
 | 68 | +        )  | 
 | 69 | +    return redis_pool  | 
 | 70 | + | 
 | 71 | + | 
 | 72 | +async def connect_redis_sentinel(  | 
 | 73 | +    settings: Optional[RedisSentinelSettings] = None,  | 
 | 74 | +) -> Optional[aioredis.Redis]:  | 
 | 75 | +    """Return a Redis Sentinel connection."""  | 
 | 76 | +    global redis_pool  | 
 | 77 | + | 
 | 78 | +    settings = settings or redis_settings  | 
 | 79 | + | 
 | 80 | +    if (  | 
 | 81 | +        not settings.REDIS_SENTINEL_HOSTS  | 
 | 82 | +        or not settings.REDIS_SENTINEL_PORTS  | 
 | 83 | +        or not settings.REDIS_SENTINEL_MASTER_NAME  | 
 | 84 | +    ):  | 
 | 85 | +        return None  | 
 | 86 | + | 
 | 87 | +    hosts = [h.strip() for h in settings.REDIS_SENTINEL_HOSTS.split(",") if h.strip()]  | 
 | 88 | +    ports = [  | 
 | 89 | +        int(p.strip()) for p in settings.REDIS_SENTINEL_PORTS.split(",") if p.strip()  | 
 | 90 | +    ]  | 
 | 91 | + | 
 | 92 | +    if redis_pool is None:  | 
 | 93 | +        try:  | 
 | 94 | +            sentinel = Sentinel(  | 
 | 95 | +                [(h, p) for h, p in zip(hosts, ports)],  | 
 | 96 | +                decode_responses=settings.REDIS_DECODE_RESPONSES,  | 
 | 97 | +            )  | 
 | 98 | +            master = sentinel.master_for(  | 
 | 99 | +                service_name=settings.REDIS_SENTINEL_MASTER_NAME,  | 
 | 100 | +                db=settings.REDIS_DB,  | 
 | 101 | +                decode_responses=settings.REDIS_DECODE_RESPONSES,  | 
 | 102 | +                retry_on_timeout=settings.REDIS_RETRY_TIMEOUT,  | 
 | 103 | +                client_name=settings.REDIS_CLIENT_NAME,  | 
 | 104 | +                max_connections=settings.REDIS_MAX_CONNECTIONS,  | 
 | 105 | +                health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL,  | 
 | 106 | +            )  | 
 | 107 | +            redis_pool = master  | 
 | 108 | + | 
 | 109 | +        except Exception:  | 
 | 110 | +            return None  | 
 | 111 | + | 
 | 112 | +    return redis_pool  | 
 | 113 | + | 
 | 114 | + | 
 | 115 | +async def save_self_link(  | 
 | 116 | +    redis: aioredis.Redis, token: Optional[str], self_href: str  | 
 | 117 | +) -> None:  | 
 | 118 | +    """Save the self link for the current token with 30 min TTL."""  | 
 | 119 | +    if token:  | 
 | 120 | +        await redis.setex(f"nav:self:{token}", 1800, self_href)  | 
 | 121 | + | 
 | 122 | + | 
 | 123 | +async def get_prev_link(redis: aioredis.Redis, token: Optional[str]) -> Optional[str]:  | 
 | 124 | +    """Get the previous page link for the current token (if exists)."""  | 
 | 125 | +    if not token:  | 
 | 126 | +        return None  | 
 | 127 | +    return await redis.get(f"nav:self:{token}")  | 
0 commit comments