Skip to content

Commit b87e9c0

Browse files
Yuri ZmytrakovYuri Zmytrakov
authored andcommitted
add redis
1 parent aecb461 commit b87e9c0

File tree

2 files changed

+294
-0
lines changed

2 files changed

+294
-0
lines changed

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
from stac_fastapi.core.base_settings import ApiBaseSettings
2525
from stac_fastapi.core.datetime_utils import format_datetime_range
2626
from stac_fastapi.core.models.links import PagingLinks
27+
from stac_fastapi.core.redis_utils import (
28+
add_previous_link,
29+
cache_current_url,
30+
cache_previous_url,
31+
connect_redis_sentinel,
32+
connect_redis,
33+
)
2734
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
2835
from stac_fastapi.core.session import Session
2936
from stac_fastapi.core.utilities import filter_fields
@@ -237,6 +244,12 @@ async def all_collections(self, **kwargs) -> stac_types.Collections:
237244
base_url = str(request.base_url)
238245
limit = int(request.query_params.get("limit", os.getenv("STAC_ITEM_LIMIT", 10)))
239246
token = request.query_params.get("token")
247+
current_url = str(request.url)
248+
redis = None
249+
try:
250+
redis = await connect_redis()
251+
except Exception as e:
252+
redis = None
240253

241254
collections, next_token = await self.database.get_all_collections(
242255
token=token, limit=limit, request=request
@@ -252,6 +265,12 @@ async def all_collections(self, **kwargs) -> stac_types.Collections:
252265
},
253266
]
254267

268+
await add_previous_link(
269+
redis, links, "collections", current_url, token
270+
)
271+
if redis:
272+
await cache_previous_url(redis, current_url, "collections")
273+
255274
if next_token:
256275
next_link = PagingLinks(next=next_token, request=request).link_next()
257276
links.append(next_link)
@@ -323,6 +342,31 @@ async def item_collection(
323342
Raises:
324343
HTTPException: 404 if the collection does not exist.
325344
"""
345+
request: Request = kwargs["request"]
346+
token = request.query_params.get("token")
347+
base_url = str(request.base_url)
348+
349+
current_url = str(request.url)
350+
351+
try:
352+
redis = await connect_redis()
353+
except Exception as e:
354+
redis = None
355+
356+
if redis:
357+
await cache_current_url(redis, current_url, collection_id)
358+
359+
collection = await self.get_collection(
360+
collection_id=collection_id, request=request
361+
)
362+
collection_id = collection.get("id")
363+
if collection_id is None:
364+
raise HTTPException(status_code=404, detail="Collection not found")
365+
366+
search = self.database.make_search()
367+
search = self.database.apply_collections_filter(
368+
search=search, collection_ids=[collection_id]
369+
)
326370
try:
327371
await self.get_collection(collection_id=collection_id, request=request)
328372
except Exception:
@@ -336,6 +380,45 @@ async def item_collection(
336380
datetime=datetime,
337381
limit=limit,
338382
token=token,
383+
collection_ids=[collection_id],
384+
datetime_search=datetime_search,
385+
)
386+
387+
items = [
388+
self.item_serializer.db_to_stac(item, base_url=base_url) for item in items
389+
]
390+
391+
collection_links = [
392+
{
393+
"rel": "collection",
394+
"type": "application/json",
395+
"href": urljoin(str(request.base_url), f"collections/{collection_id}"),
396+
},
397+
{
398+
"rel": "parent",
399+
"type": "application/json",
400+
"href": urljoin(str(request.base_url), f"collections/{collection_id}"),
401+
},
402+
]
403+
404+
paging_links = await PagingLinks(request=request, next=next_token).get_links()
405+
406+
if redis:
407+
await add_previous_link(
408+
redis, paging_links, collection_id, current_url, token
409+
)
410+
411+
if redis:
412+
await cache_previous_url(redis, current_url, collection_id)
413+
414+
links = collection_links + paging_links
415+
416+
return stac_types.ItemCollection(
417+
type="FeatureCollection",
418+
features=items,
419+
links=links,
420+
numReturned=len(items),
421+
numMatched=maybe_count,
339422
sortby=sortby,
340423
query=query,
341424
filter_expr=filter_expr,
@@ -482,7 +565,14 @@ async def post_search(
482565
HTTPException: If there is an error with the cql2_json filter.
483566
"""
484567
base_url = str(request.base_url)
568+
current_url = str(request.url)
569+
try:
570+
redis = await connect_redis()
571+
except Exception as e:
572+
redis = None
485573

574+
if redis:
575+
await cache_current_url(redis, current_url, "search_result")
486576
search = self.database.make_search()
487577

488578
if search_request.ids:
@@ -592,6 +682,14 @@ async def post_search(
592682
]
593683
links = await PagingLinks(request=request, next=next_token).get_links()
594684

685+
if redis:
686+
await add_previous_link(
687+
redis, links, "search_result", current_url, search_request.token
688+
)
689+
690+
if redis:
691+
await cache_previous_url(redis, current_url, "search_result")
692+
595693
return stac_types.ItemCollection(
596694
type="FeatureCollection",
597695
features=items,
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
"""Utilities for connecting to and managing Redis connections."""
2+
3+
import logging
4+
import os
5+
from typing import Dict, List, Optional
6+
7+
from pydantic_settings import BaseSettings
8+
from redis import asyncio as aioredis
9+
from stac_pydantic.shared import MimeTypes
10+
11+
from stac_fastapi.core.utilities import get_bool_env
12+
13+
redis_pool = None
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class RedisSentinelSettings(BaseSettings):
19+
"""Configuration settings for connecting to a Redis Sentinel server."""
20+
21+
sentinel_hosts: List[str] = os.getenv("REDIS_SENTINEL_HOSTS", "").split(",")
22+
sentinel_ports: List[int] = [
23+
int(port)
24+
for port in os.getenv("REDIS_SENTINEL_PORTS", "").split(",")
25+
if port.strip()
26+
]
27+
sentinel_master_name: str = os.getenv("REDIS_SENTINEL_MASTER_NAME", "")
28+
redis_db: int = int(os.getenv("REDIS_DB", "0"))
29+
30+
max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "5"))
31+
retry_on_timeout: bool = get_bool_env("REDIS_RETRY_TIMEOUT", True)
32+
decode_responses: bool = get_bool_env("REDIS_DECODE_RESPONSES", True)
33+
client_name: str = os.getenv("REDIS_CLIENT_NAME", "stac-fastapi-app")
34+
health_check_interval: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))
35+
36+
37+
class RedisSettings(BaseSettings):
38+
"""Configuration settings for connecting to a Redis server."""
39+
40+
redis_host: str = os.getenv("REDIS_HOST", "localhost")
41+
redis_port: int = int(os.getenv("REDIS_PORT", "6379"))
42+
redis_db: int = int(os.getenv("REDIS_DB", "0"))
43+
44+
max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "5"))
45+
retry_on_timeout: bool = get_bool_env("REDIS_RETRY_TIMEOUT", True)
46+
decode_responses: bool = get_bool_env("REDIS_DECODE_RESPONSES", True)
47+
client_name: str = os.getenv("REDIS_CLIENT_NAME", "stac-fastapi-app")
48+
health_check_interval: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))
49+
50+
51+
# select which configuration to be used RedisSettings or RedisSentinelSettings
52+
redis_settings = RedisSettings()
53+
54+
55+
async def connect_redis_sentinel(
56+
settings: Optional[RedisSentinelSettings] = None,
57+
) -> Optional[aioredis.Redis]:
58+
"""Return a Redis Sentinel connection."""
59+
global redis_pool
60+
settings = redis_settings
61+
62+
if (
63+
not settings.sentinel_hosts
64+
or not settings.sentinel_hosts[0]
65+
or not settings.sentinel_master_name
66+
):
67+
return None
68+
69+
if redis_pool is None:
70+
try:
71+
sentinel = aioredis.Sentinel(
72+
[
73+
(host, port)
74+
for host, port in zip(
75+
settings.sentinel_hosts, settings.sentinel_ports
76+
)
77+
],
78+
decode_responses=settings.decode_responses,
79+
retry_on_timeout=settings.retry_on_timeout,
80+
client_name=f"{settings.client_name}-sentinel",
81+
)
82+
83+
master = sentinel.master_for(
84+
settings.sentinel_master_name,
85+
db=settings.redis_db,
86+
decode_responses=settings.decode_responses,
87+
retry_on_timeout=settings.retry_on_timeout,
88+
client_name=settings.client_name,
89+
max_connections=settings.max_connections,
90+
)
91+
92+
redis_pool = master
93+
94+
except:
95+
return None
96+
97+
return redis_pool
98+
99+
100+
async def connect_redis(
101+
settings: Optional[RedisSettings] = None,
102+
) -> Optional[aioredis.Redis]:
103+
"""Return a Redis connection for regular Redis server."""
104+
global redis_pool
105+
settings = redis_settings
106+
107+
if not settings.redis_host:
108+
return None
109+
110+
if redis_pool is None:
111+
try:
112+
redis_pool = aioredis.Redis(
113+
host=settings.redis_host,
114+
port=settings.redis_port,
115+
db=settings.redis_db,
116+
decode_responses=settings.decode_responses,
117+
retry_on_timeout=settings.retry_on_timeout,
118+
client_name=settings.client_name,
119+
health_check_interval=settings.health_check_interval,
120+
max_connections=settings.max_connections,
121+
)
122+
except Exception as e:
123+
logger.error(f"Redis connection failed: {e}")
124+
return None
125+
126+
return redis_pool
127+
128+
129+
async def close_redis() -> None:
130+
"""Close the Redis connection pool if it exists."""
131+
global redis_pool
132+
if redis_pool:
133+
await redis_pool.close()
134+
redis_pool = None
135+
136+
137+
async def cache_current_url(redis, current_url: str, key: str) -> None:
138+
"""Add to Redis cache the current URL for navigation."""
139+
if not redis:
140+
return
141+
142+
try:
143+
current_key = f"current:{key}"
144+
await redis.setex(current_key, 600, current_url)
145+
except Exception as e:
146+
logger.error(f"Redis cache error for {key}: {e}")
147+
148+
149+
async def get_previous_url(redis, key: str) -> Optional[str]:
150+
"""Get previous URL from Redis cache if it exists."""
151+
if redis is None:
152+
return None
153+
154+
try:
155+
prev_key = f"prev:{key}"
156+
previous_url = await redis.get(prev_key)
157+
if previous_url:
158+
return previous_url
159+
except Exception as e:
160+
logger.error(f"Redis get previous error for {key}: {e}")
161+
162+
return None
163+
164+
165+
async def cache_previous_url(redis, current_url: str, key: str) -> None:
166+
"""Cache the current URL as previous for previous links in next page."""
167+
if not redis:
168+
return
169+
170+
try:
171+
prev_key = f"prev:{key}"
172+
await redis.setex(prev_key, 600, current_url)
173+
except Exception as e:
174+
logger.error(f"Redis cache previous error for {key}: {e}")
175+
176+
177+
async def add_previous_link(
178+
redis,
179+
links: List[Dict],
180+
key: str,
181+
current_url: str,
182+
token: Optional[str] = None,
183+
) -> None:
184+
"""Add previous link into navigation."""
185+
if not redis or not token:
186+
return
187+
188+
previous_url = await get_previous_url(redis, key)
189+
if previous_url:
190+
links.append(
191+
{
192+
"rel": "previous",
193+
"type": MimeTypes.json,
194+
"href": previous_url,
195+
}
196+
)

0 commit comments

Comments
 (0)