| 
 | 1 | +"""Utilities for connecting to and managing Redis connections."""  | 
 | 2 | + | 
 | 3 | +import logging  | 
 | 4 | +import os  | 
 | 5 | +from typing import Dict, List, Optional  | 
 | 6 | + | 
 | 7 | +from pydantic_settings import BaseSettings  | 
 | 8 | +from redis import asyncio as aioredis  | 
 | 9 | +from stac_pydantic.shared import MimeTypes  | 
 | 10 | + | 
 | 11 | +from stac_fastapi.core.utilities import get_bool_env  | 
 | 12 | + | 
 | 13 | +redis_pool = None  | 
 | 14 | + | 
 | 15 | +logger = logging.getLogger(__name__)  | 
 | 16 | + | 
 | 17 | + | 
 | 18 | +class RedisSentinelSettings(BaseSettings):  | 
 | 19 | +    """Configuration settings for connecting to a Redis Sentinel server."""  | 
 | 20 | + | 
 | 21 | +    sentinel_hosts: List[str] = os.getenv("REDIS_SENTINEL_HOSTS", "").split(",")  | 
 | 22 | +    sentinel_ports: List[int] = [  | 
 | 23 | +        int(port)  | 
 | 24 | +        for port in os.getenv("REDIS_SENTINEL_PORTS", "").split(",")  | 
 | 25 | +        if port.strip()  | 
 | 26 | +    ]  | 
 | 27 | +    sentinel_master_name: str = os.getenv("REDIS_SENTINEL_MASTER_NAME", "")  | 
 | 28 | +    redis_db: int = int(os.getenv("REDIS_DB", "0"))  | 
 | 29 | + | 
 | 30 | +    max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "5"))  | 
 | 31 | +    retry_on_timeout: bool = get_bool_env("REDIS_RETRY_TIMEOUT", True)  | 
 | 32 | +    decode_responses: bool = get_bool_env("REDIS_DECODE_RESPONSES", True)  | 
 | 33 | +    client_name: str = os.getenv("REDIS_CLIENT_NAME", "stac-fastapi-app")  | 
 | 34 | +    health_check_interval: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))  | 
 | 35 | + | 
 | 36 | + | 
 | 37 | +class RedisSettings(BaseSettings):  | 
 | 38 | +    """Configuration settings for connecting to a Redis server."""  | 
 | 39 | + | 
 | 40 | +    redis_host: str = os.getenv("REDIS_HOST", "localhost")  | 
 | 41 | +    redis_port: int = int(os.getenv("REDIS_PORT", "6379"))  | 
 | 42 | +    redis_db: int = int(os.getenv("REDIS_DB", "0"))  | 
 | 43 | + | 
 | 44 | +    max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "5"))  | 
 | 45 | +    retry_on_timeout: bool = get_bool_env("REDIS_RETRY_TIMEOUT", True)  | 
 | 46 | +    decode_responses: bool = get_bool_env("REDIS_DECODE_RESPONSES", True)  | 
 | 47 | +    client_name: str = os.getenv("REDIS_CLIENT_NAME", "stac-fastapi-app")  | 
 | 48 | +    health_check_interval: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))  | 
 | 49 | + | 
 | 50 | + | 
 | 51 | +# select which configuration to be used RedisSettings or RedisSentinelSettings  | 
 | 52 | +redis_settings = RedisSettings()  | 
 | 53 | + | 
 | 54 | + | 
 | 55 | +async def connect_redis_sentinel(  | 
 | 56 | +    settings: Optional[RedisSentinelSettings] = None,  | 
 | 57 | +) -> Optional[aioredis.Redis]:  | 
 | 58 | +    """Return a Redis Sentinel connection."""  | 
 | 59 | +    global redis_pool  | 
 | 60 | +    settings = redis_settings  | 
 | 61 | + | 
 | 62 | +    if (  | 
 | 63 | +        not settings.sentinel_hosts  | 
 | 64 | +        or not settings.sentinel_hosts[0]  | 
 | 65 | +        or not settings.sentinel_master_name  | 
 | 66 | +    ):  | 
 | 67 | +        return None  | 
 | 68 | + | 
 | 69 | +    if redis_pool is None:  | 
 | 70 | +        try:  | 
 | 71 | +            sentinel = aioredis.Sentinel(  | 
 | 72 | +                [  | 
 | 73 | +                    (host, port)  | 
 | 74 | +                    for host, port in zip(  | 
 | 75 | +                        settings.sentinel_hosts, settings.sentinel_ports  | 
 | 76 | +                    )  | 
 | 77 | +                ],  | 
 | 78 | +                decode_responses=settings.decode_responses,  | 
 | 79 | +                retry_on_timeout=settings.retry_on_timeout,  | 
 | 80 | +                client_name=f"{settings.client_name}-sentinel",  | 
 | 81 | +            )  | 
 | 82 | + | 
 | 83 | +            master = sentinel.master_for(  | 
 | 84 | +                settings.sentinel_master_name,  | 
 | 85 | +                db=settings.redis_db,  | 
 | 86 | +                decode_responses=settings.decode_responses,  | 
 | 87 | +                retry_on_timeout=settings.retry_on_timeout,  | 
 | 88 | +                client_name=settings.client_name,  | 
 | 89 | +                max_connections=settings.max_connections,  | 
 | 90 | +            )  | 
 | 91 | + | 
 | 92 | +            redis_pool = master  | 
 | 93 | + | 
 | 94 | +        except Exception:  | 
 | 95 | +            return None  | 
 | 96 | + | 
 | 97 | +    return redis_pool  | 
 | 98 | + | 
 | 99 | + | 
 | 100 | +async def connect_redis(  | 
 | 101 | +    settings: Optional[RedisSettings] = None,  | 
 | 102 | +) -> Optional[aioredis.Redis]:  | 
 | 103 | +    """Return a Redis connection for regular Redis server."""  | 
 | 104 | +    global redis_pool  | 
 | 105 | +    settings = redis_settings  | 
 | 106 | + | 
 | 107 | +    if not settings.redis_host:  | 
 | 108 | +        return None  | 
 | 109 | + | 
 | 110 | +    if redis_pool is None:  | 
 | 111 | +        try:  | 
 | 112 | +            pool = aioredis.ConnectionPool(  | 
 | 113 | +                host=settings.redis_host,  | 
 | 114 | +                port=settings.redis_port,  | 
 | 115 | +                db=settings.redis_db,  | 
 | 116 | +                max_connections=settings.max_connections,  | 
 | 117 | +                decode_responses=settings.decode_responses,  | 
 | 118 | +                retry_on_timeout=settings.retry_on_timeout,  | 
 | 119 | +                health_check_interval=settings.health_check_interval,  | 
 | 120 | +            )  | 
 | 121 | +            redis_pool = aioredis.Redis(  | 
 | 122 | +                connection_pool=pool,  | 
 | 123 | +                client_name=settings.client_name,  | 
 | 124 | +            )  | 
 | 125 | +        except Exception as e:  | 
 | 126 | +            logger.error(f"Redis connection failed: {e}")  | 
 | 127 | +            return None  | 
 | 128 | + | 
 | 129 | +    return redis_pool  | 
 | 130 | + | 
 | 131 | + | 
 | 132 | +async def close_redis() -> None:  | 
 | 133 | +    """Close the Redis connection pool if it exists."""  | 
 | 134 | +    global redis_pool  | 
 | 135 | +    if redis_pool:  | 
 | 136 | +        await redis_pool.close()  | 
 | 137 | +        redis_pool = None  | 
 | 138 | + | 
 | 139 | + | 
 | 140 | +async def cache_current_url(redis, current_url: str, key: str) -> None:  | 
 | 141 | +    """Add to Redis cache the current URL for navigation."""  | 
 | 142 | +    if not redis:  | 
 | 143 | +        return  | 
 | 144 | + | 
 | 145 | +    try:  | 
 | 146 | +        current_key = f"current:{key}"  | 
 | 147 | +        await redis.setex(current_key, 600, current_url)  | 
 | 148 | +    except Exception as e:  | 
 | 149 | +        logger.error(f"Redis cache error for {key}: {e}")  | 
 | 150 | + | 
 | 151 | + | 
 | 152 | +async def get_previous_url(redis, key: str) -> Optional[str]:  | 
 | 153 | +    """Get previous URL from Redis cache if it exists."""  | 
 | 154 | +    if redis is None:  | 
 | 155 | +        return None  | 
 | 156 | + | 
 | 157 | +    try:  | 
 | 158 | +        prev_key = f"prev:{key}"  | 
 | 159 | +        previous_url = await redis.get(prev_key)  | 
 | 160 | +        if previous_url:  | 
 | 161 | +            return previous_url  | 
 | 162 | +    except Exception as e:  | 
 | 163 | +        logger.error(f"Redis get previous error for {key}: {e}")  | 
 | 164 | + | 
 | 165 | +    return None  | 
 | 166 | + | 
 | 167 | + | 
 | 168 | +async def cache_previous_url(redis, current_url: str, key: str) -> None:  | 
 | 169 | +    """Cache the current URL as previous for previous links in next page."""  | 
 | 170 | +    if not redis:  | 
 | 171 | +        return  | 
 | 172 | + | 
 | 173 | +    try:  | 
 | 174 | +        prev_key = f"prev:{key}"  | 
 | 175 | +        await redis.setex(prev_key, 600, current_url)  | 
 | 176 | +    except Exception as e:  | 
 | 177 | +        logger.error(f"Redis cache previous error for {key}: {e}")  | 
 | 178 | + | 
 | 179 | + | 
 | 180 | +async def add_previous_link(  | 
 | 181 | +    redis,  | 
 | 182 | +    links: List[Dict],  | 
 | 183 | +    key: str,  | 
 | 184 | +    current_url: str,  | 
 | 185 | +    token: Optional[str] = None,  | 
 | 186 | +) -> None:  | 
 | 187 | +    """Add previous link into navigation."""  | 
 | 188 | +    if not redis or not token:  | 
 | 189 | +        return  | 
 | 190 | + | 
 | 191 | +    previous_url = await get_previous_url(redis, key)  | 
 | 192 | +    if previous_url:  | 
 | 193 | +        links.append(  | 
 | 194 | +            {  | 
 | 195 | +                "rel": "previous",  | 
 | 196 | +                "type": MimeTypes.json,  | 
 | 197 | +                "href": previous_url,  | 
 | 198 | +            }  | 
 | 199 | +        )  | 
0 commit comments