|  | 
|  | 1 | +"""Utilities for connecting to and managing Redis connections.""" | 
|  | 2 | + | 
|  | 3 | +from typing import Optional | 
|  | 4 | + | 
|  | 5 | +from pydantic_settings import BaseSettings | 
|  | 6 | +from redis import asyncio as aioredis | 
|  | 7 | +from redis.asyncio.sentinel import Sentinel | 
|  | 8 | + | 
|  | 9 | +redis_pool: Optional[aioredis.Redis] = None | 
|  | 10 | + | 
|  | 11 | + | 
|  | 12 | +class RedisSentinelSettings(BaseSettings): | 
|  | 13 | +    """Configuration for connecting to Redis Sentinel.""" | 
|  | 14 | + | 
|  | 15 | +    REDIS_SENTINEL_HOSTS: str = "" | 
|  | 16 | +    REDIS_SENTINEL_PORTS: str = "26379" | 
|  | 17 | +    REDIS_SENTINEL_MASTER_NAME: str = "master" | 
|  | 18 | +    REDIS_DB: int = 15 | 
|  | 19 | + | 
|  | 20 | +    REDIS_MAX_CONNECTIONS: int = 10 | 
|  | 21 | +    REDIS_RETRY_TIMEOUT: bool = True | 
|  | 22 | +    REDIS_DECODE_RESPONSES: bool = True | 
|  | 23 | +    REDIS_CLIENT_NAME: str = "stac-fastapi-app" | 
|  | 24 | +    REDIS_HEALTH_CHECK_INTERVAL: int = 30 | 
|  | 25 | + | 
|  | 26 | + | 
|  | 27 | +class RedisSettings(BaseSettings): | 
|  | 28 | +    """Configuration for connecting Redis Sentinel.""" | 
|  | 29 | + | 
|  | 30 | +    REDIS_HOST: str = "" | 
|  | 31 | +    REDIS_PORT: int = 6379 | 
|  | 32 | +    REDIS_DB: int = 0 | 
|  | 33 | + | 
|  | 34 | +    REDIS_MAX_CONNECTIONS: int = 10 | 
|  | 35 | +    REDIS_RETRY_TIMEOUT: bool = True | 
|  | 36 | +    REDIS_DECODE_RESPONSES: bool = True | 
|  | 37 | +    REDIS_CLIENT_NAME: str = "stac-fastapi-app" | 
|  | 38 | +    REDIS_HEALTH_CHECK_INTERVAL: int = 30 | 
|  | 39 | + | 
|  | 40 | + | 
|  | 41 | +# Select the Redis or Redis Sentinel configuration | 
|  | 42 | +redis_settings: BaseSettings = RedisSettings() | 
|  | 43 | + | 
|  | 44 | + | 
|  | 45 | +async def connect_redis(settings: Optional[RedisSettings] = None) -> aioredis.Redis: | 
|  | 46 | +    """Return a Redis connection.""" | 
|  | 47 | +    global redis_pool | 
|  | 48 | +    settings = settings or redis_settings | 
|  | 49 | + | 
|  | 50 | +    if not settings.REDIS_HOST or not settings.REDIS_PORT: | 
|  | 51 | +        return None | 
|  | 52 | + | 
|  | 53 | +    if redis_pool is None: | 
|  | 54 | +        pool = aioredis.ConnectionPool( | 
|  | 55 | +            host=settings.REDIS_HOST, | 
|  | 56 | +            port=settings.REDIS_PORT, | 
|  | 57 | +            db=settings.REDIS_DB, | 
|  | 58 | +            max_connections=settings.REDIS_MAX_CONNECTIONS, | 
|  | 59 | +            decode_responses=settings.REDIS_DECODE_RESPONSES, | 
|  | 60 | +            retry_on_timeout=settings.REDIS_RETRY_TIMEOUT, | 
|  | 61 | +            health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL, | 
|  | 62 | +        ) | 
|  | 63 | +        redis_pool = aioredis.Redis( | 
|  | 64 | +            connection_pool=pool, client_name=settings.REDIS_CLIENT_NAME | 
|  | 65 | +        ) | 
|  | 66 | +    return redis_pool | 
|  | 67 | + | 
|  | 68 | + | 
|  | 69 | +async def connect_redis_sentinel( | 
|  | 70 | +    settings: Optional[RedisSentinelSettings] = None, | 
|  | 71 | +) -> Optional[aioredis.Redis]: | 
|  | 72 | +    """Return a Redis Sentinel connection.""" | 
|  | 73 | +    global redis_pool | 
|  | 74 | + | 
|  | 75 | +    settings = settings or redis_settings | 
|  | 76 | + | 
|  | 77 | +    if ( | 
|  | 78 | +        not settings.REDIS_SENTINEL_HOSTS | 
|  | 79 | +        or not settings.REDIS_SENTINEL_PORTS | 
|  | 80 | +        or not settings.REDIS_SENTINEL_MASTER_NAME | 
|  | 81 | +    ): | 
|  | 82 | +        return None | 
|  | 83 | + | 
|  | 84 | +    hosts = [h.strip() for h in settings.REDIS_SENTINEL_HOSTS.split(",") if h.strip()] | 
|  | 85 | +    ports = [ | 
|  | 86 | +        int(p.strip()) for p in settings.REDIS_SENTINEL_PORTS.split(",") if p.strip() | 
|  | 87 | +    ] | 
|  | 88 | + | 
|  | 89 | +    if redis_pool is None: | 
|  | 90 | +        try: | 
|  | 91 | +            sentinel = Sentinel( | 
|  | 92 | +                [(h, p) for h, p in zip(hosts, ports)], | 
|  | 93 | +                decode_responses=settings.REDIS_DECODE_RESPONSES, | 
|  | 94 | +            ) | 
|  | 95 | +            master = sentinel.master_for( | 
|  | 96 | +                service_name=settings.REDIS_SENTINEL_MASTER_NAME, | 
|  | 97 | +                db=settings.REDIS_DB, | 
|  | 98 | +                decode_responses=settings.REDIS_DECODE_RESPONSES, | 
|  | 99 | +                retry_on_timeout=settings.REDIS_RETRY_TIMEOUT, | 
|  | 100 | +                client_name=settings.REDIS_CLIENT_NAME, | 
|  | 101 | +                max_connections=settings.REDIS_MAX_CONNECTIONS, | 
|  | 102 | +                health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL, | 
|  | 103 | +            ) | 
|  | 104 | +            redis_pool = master | 
|  | 105 | + | 
|  | 106 | +        except Exception: | 
|  | 107 | +            return None | 
|  | 108 | + | 
|  | 109 | +    return redis_pool | 
|  | 110 | + | 
|  | 111 | + | 
|  | 112 | +async def save_self_link( | 
|  | 113 | +    redis: aioredis.Redis, token: Optional[str], self_href: str | 
|  | 114 | +) -> None: | 
|  | 115 | +    """Save the self link for the current token with 30 min TTL.""" | 
|  | 116 | +    if token: | 
|  | 117 | +        await redis.setex(f"nav:self:{token}", 1800, self_href) | 
|  | 118 | + | 
|  | 119 | + | 
|  | 120 | +async def get_prev_link(redis: aioredis.Redis, token: Optional[str]) -> Optional[str]: | 
|  | 121 | +    """Get the previous page link for the current token (if exists).""" | 
|  | 122 | +    if not token: | 
|  | 123 | +        return None | 
|  | 124 | +    return await redis.get(f"nav:self:{token}") | 
0 commit comments