Skip to content

Commit f023e05

Browse files
Yuri ZmytrakovYuri Zmytrakov
authored andcommitted
add redis
1 parent 041b729 commit f023e05

File tree

11 files changed

+260
-24
lines changed

11 files changed

+260
-24
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ repos:
3131
]
3232
additional_dependencies: [
3333
"types-attrs",
34-
"types-requests"
34+
"types-requests",
35+
"types-redis"
3536
]
3637
- repo: https://github.com/PyCQA/pydocstyle
3738
rev: 6.1.1

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ docker-shell-os:
6363

6464
.PHONY: test-elasticsearch
6565
test-elasticsearch:
66-
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest'
66+
-$(run_es) /bin/bash -c 'pip install redis==6.4.0 export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest'
6767
docker compose down
6868

6969
.PHONY: test-opensearch

compose.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ services:
2222
- ES_VERIFY_CERTS=false
2323
- BACKEND=elasticsearch
2424
- DATABASE_REFRESH=true
25+
- REDIS_HOST=redis
26+
- REDIS_PORT=6379
2527
ports:
2628
- "8080:8080"
2729
volumes:
@@ -30,6 +32,7 @@ services:
3032
- ./esdata:/usr/share/elasticsearch/data
3133
depends_on:
3234
- elasticsearch
35+
- redis
3336
command:
3437
bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app"
3538

@@ -94,3 +97,10 @@ services:
9497
- ./opensearch/snapshots:/usr/share/opensearch/snapshots
9598
ports:
9699
- "9202:9202"
100+
101+
redis:
102+
container_name: stac-redis
103+
image: redis:7.2-alpine
104+
restart: always
105+
ports:
106+
- "6379:6379"

docker-compose.redis.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
version: '3.8'
2+
3+
services:
4+
redis:
5+
image: redis:7-alpine
6+
container_name: stac-app-redis
7+
ports:
8+
- "6379:6379"
9+
volumes:
10+
- redis_data:/data
11+
command: >
12+
redis-server
13+
--appendonly yes
14+
--maxmemory 256mb
15+
--maxmemory-policy allkeys-lru
16+
healthcheck:
17+
test: ["CMD", "redis-cli", "ping"]
18+
interval: 10s
19+
timeout: 5s
20+
retries: 3
21+
restart: unless-stopped
22+
23+
volumes:
24+
redis_data:

stac_fastapi/core/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"pygeofilter~=0.3.1",
2020
"jsonschema~=4.0.0",
2121
"slowapi~=0.1.9",
22+
"redis==6.4.0",
2223
]
2324

2425
setup(

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@
2424
from stac_fastapi.core.base_settings import ApiBaseSettings
2525
from stac_fastapi.core.datetime_utils import format_datetime_range
2626
from stac_fastapi.core.models.links import PagingLinks
27+
from stac_fastapi.core.redis_utils import (
28+
add_previous_link_if_exists,
29+
cache_current_url,
30+
cache_previous_url,
31+
connect_redis,
32+
)
2733
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
2834
from stac_fastapi.core.session import Session
2935
from stac_fastapi.core.utilities import filter_fields
@@ -237,6 +243,13 @@ async def all_collections(self, **kwargs) -> stac_types.Collections:
237243
base_url = str(request.base_url)
238244
limit = int(request.query_params.get("limit", os.getenv("STAC_ITEM_LIMIT", 10)))
239245
token = request.query_params.get("token")
246+
current_url = str(request.url)
247+
redis = None
248+
try:
249+
redis = await connect_redis()
250+
except Exception as e:
251+
logger.error(f"Redis connection error: {e}")
252+
redis = None
240253

241254
collections, next_token = await self.database.get_all_collections(
242255
token=token, limit=limit, request=request
@@ -252,6 +265,12 @@ async def all_collections(self, **kwargs) -> stac_types.Collections:
252265
},
253266
]
254267

268+
await add_previous_link_if_exists(
269+
redis, links, "collections", current_url, token
270+
)
271+
if redis:
272+
await cache_previous_url(redis, current_url, "collections")
273+
255274
if next_token:
256275
next_link = PagingLinks(next=next_token, request=request).link_next()
257276
links.append(next_link)
@@ -310,20 +329,19 @@ async def item_collection(
310329
"""
311330
request: Request = kwargs["request"]
312331
token = request.query_params.get("token")
313-
if not hasattr(self, '_prev_links'):
314-
self._prev_links = {}
315-
316-
session_id = request.cookies.get('stac_session', 'default_session')
317-
current_self_link = str(request.url)
318-
319-
if session_id not in self._prev_links:
320-
self._prev_links[session_id] = []
321-
322-
history = self._prev_links[session_id]
323-
if not history or current_self_link != history[-1]:
324-
history.append(current_self_link)
325332
base_url = str(request.base_url)
326333

334+
current_url = str(request.url)
335+
336+
try:
337+
redis = await connect_redis()
338+
except Exception as e:
339+
logger.error(f"Redis connection error: {e}")
340+
redis = None
341+
342+
if redis:
343+
await cache_current_url(redis, current_url, collection_id)
344+
327345
collection = await self.get_collection(
328346
collection_id=collection_id, request=request
329347
)
@@ -374,21 +392,22 @@ async def item_collection(
374392
"href": urljoin(str(request.base_url), f"collections/{collection_id}"),
375393
},
376394
{
377-
"rel": "parent",
395+
"rel": "parent",
378396
"type": "application/json",
379397
"href": urljoin(str(request.base_url), f"collections/{collection_id}"),
380-
}
398+
},
381399
]
382400

383401
paging_links = await PagingLinks(request=request, next=next_token).get_links()
384-
history = self._prev_links.get(session_id, [])
385-
if len(history) > 1:
386-
previous_self_link = history[-2]
387-
paging_links.append({
388-
"rel": "previous",
389-
"type": "application/json",
390-
"href": previous_self_link,
391-
})
402+
403+
if redis:
404+
await add_previous_link_if_exists(
405+
redis, paging_links, collection_id, current_url, token
406+
)
407+
408+
if redis:
409+
await cache_previous_url(redis, current_url, collection_id)
410+
392411
links = collection_links + paging_links
393412

394413
return stac_types.ItemCollection(
@@ -529,7 +548,15 @@ async def post_search(
529548
HTTPException: If there is an error with the cql2_json filter.
530549
"""
531550
base_url = str(request.base_url)
551+
current_url = str(request.url)
552+
try:
553+
redis = await connect_redis()
554+
except Exception as e:
555+
logger.error(f"Redis connection error: {e}")
556+
redis = None
532557

558+
if redis:
559+
await cache_current_url(redis, current_url, "search_result")
533560
search = self.database.make_search()
534561

535562
if search_request.ids:
@@ -628,6 +655,14 @@ async def post_search(
628655
]
629656
links = await PagingLinks(request=request, next=next_token).get_links()
630657

658+
if redis:
659+
await add_previous_link_if_exists(
660+
redis, links, "search_result", current_url, search_request.token
661+
)
662+
663+
if redis:
664+
await cache_previous_url(redis, current_url, "search_result")
665+
631666
return stac_types.ItemCollection(
632667
type="FeatureCollection",
633668
features=items,
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""Utilities for connecting to and managing Redis connections."""
2+
3+
import logging
4+
import os
5+
from typing import Dict, List, Optional
6+
7+
from pydantic_settings import BaseSettings
8+
from redis import asyncio as aioredis
9+
from stac_pydantic.shared import MimeTypes
10+
11+
from stac_fastapi.core.utilities import get_bool_env
12+
13+
redis_pool = None
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class RedisSettings(BaseSettings):
19+
"""Configuration settings for connecting to a Redis server."""
20+
21+
sentinel_hosts: List[str] = os.getenv("REDIS_SENTINEL_HOSTS", "").split(",")
22+
sentinel_ports: List[int] = [
23+
int(port)
24+
for port in os.getenv("REDIS_SENTINEL_PORTS", "").split(",")
25+
if port.strip()
26+
]
27+
sentinel_master_name: str = os.getenv("REDIS_SENTINEL_MASTER_NAME", "")
28+
redis_db: int = int(os.getenv("REDIS_DB", "0"))
29+
30+
max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "5"))
31+
retry_on_timeout: bool = get_bool_env("REDIS_RETRY_TIMEOUT", True)
32+
decode_responses: bool = get_bool_env("REDIS_DECODE_RESPONSES", True)
33+
client_name: str = os.getenv("REDIS_CLIENT_NAME", "stac-fastapi-app")
34+
health_check_interval: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))
35+
36+
37+
redis_settings = RedisSettings()
38+
39+
40+
async def connect_redis(
41+
settings: Optional[RedisSettings] = None,
42+
) -> Optional[aioredis.Redis]:
43+
"""Return a Redis connection, returning None if not configured or connection fails."""
44+
global redis_pool
45+
settings = settings or redis_settings
46+
47+
if (
48+
not settings.sentinel_hosts
49+
or not settings.sentinel_hosts[0]
50+
or not settings.sentinel_master_name
51+
):
52+
logger.warning("Redis not configured - skipping Redis operations")
53+
return None
54+
55+
if redis_pool is None:
56+
try:
57+
# Create async Sentinel connection
58+
sentinel = aioredis.Sentinel(
59+
[
60+
(host, port)
61+
for host, port in zip(
62+
settings.sentinel_hosts, settings.sentinel_ports
63+
)
64+
],
65+
decode_responses=settings.decode_responses,
66+
retry_on_timeout=settings.retry_on_timeout,
67+
client_name=f"{settings.client_name}-sentinel",
68+
)
69+
70+
# Get async master connection
71+
master = sentinel.master_for(
72+
settings.sentinel_master_name,
73+
db=settings.redis_db,
74+
decode_responses=settings.decode_responses,
75+
retry_on_timeout=settings.retry_on_timeout,
76+
client_name=settings.client_name,
77+
)
78+
79+
# Test the connection
80+
await master.ping()
81+
logger.info("Redis Sentinel connection successful!")
82+
83+
redis_pool = master
84+
85+
except Exception as e:
86+
logger.error(f"Failed to connect to Redis Sentinel: {e}")
87+
return None
88+
89+
return redis_pool
90+
91+
92+
async def close_redis() -> None:
93+
"""Close the Redis connection pool if it exists."""
94+
global redis_pool
95+
if redis_pool:
96+
await redis_pool.close()
97+
redis_pool = None
98+
99+
100+
async def cache_current_url(redis, current_url: str, key_suffix: str) -> None:
101+
"""Cache the current URL for pagination."""
102+
if not redis:
103+
return
104+
105+
try:
106+
current_key = f"current:{key_suffix}"
107+
await redis.setex(current_key, 600, current_url)
108+
except Exception as e:
109+
logger.error(f"Redis cache error for {key_suffix}: {e}")
110+
111+
112+
async def get_previous_url(redis, key_suffix: str, current_url: str) -> Optional[str]:
113+
"""Get previous URL from cache if it exists."""
114+
if redis is None:
115+
return None
116+
117+
try:
118+
prev_key = f"prev:{key_suffix}"
119+
previous_url = await redis.get(prev_key)
120+
# REMOVE the current_url comparison - just return the cached value
121+
if previous_url:
122+
return previous_url
123+
except Exception as e:
124+
logger.error(f"Redis get previous error for {key_suffix}: {e}")
125+
126+
return None
127+
128+
129+
async def cache_previous_url(redis, current_url: str, key_suffix: str) -> None:
130+
"""Cache the current URL as previous for next request."""
131+
if not redis:
132+
return
133+
134+
try:
135+
prev_key = f"prev:{key_suffix}"
136+
await redis.setex(prev_key, 600, current_url)
137+
print(f"DEBUG: Cached {current_url} as previous for {key_suffix}") # Add debug
138+
except Exception as e:
139+
logger.error(f"Redis cache previous error for {key_suffix}: {e}")
140+
141+
142+
async def add_previous_link_if_exists(
143+
redis,
144+
links: List[Dict],
145+
key_suffix: str,
146+
current_url: str,
147+
token: Optional[str] = None,
148+
) -> None:
149+
"""Add previous link to links list if it exists in cache and conditions are met."""
150+
if not redis or not token:
151+
return
152+
153+
previous_url = await get_previous_url(redis, key_suffix, current_url)
154+
if previous_url:
155+
links.append(
156+
{
157+
"rel": "previous",
158+
"type": MimeTypes.json,
159+
"href": previous_url,
160+
}
161+
)

stac_fastapi/elasticsearch/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"elasticsearch[async]~=8.18.0",
1212
"uvicorn~=0.23.0",
1313
"starlette>=0.35.0,<0.36.0",
14+
"redis==6.4.0",
1415
]
1516

1617
extra_reqs = {

stac_fastapi/opensearch/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"opensearch-py[async]~=2.8.0",
1313
"uvicorn~=0.23.0",
1414
"starlette>=0.35.0,<0.36.0",
15+
"redis==6.4.0",
1516
]
1617

1718
extra_reqs = {

0 commit comments

Comments
 (0)