Skip to content

Commit 565f0dc

Browse files
Yuri ZmytrakovYuri Zmytrakov
authored andcommitted
add redis
1 parent 041b729 commit 565f0dc

File tree

7 files changed

+206
-25
lines changed

7 files changed

+206
-25
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ repos:
3131
]
3232
additional_dependencies: [
3333
"types-attrs",
34-
"types-requests"
34+
"types-requests",
35+
"types-redis"
3536
]
3637
- repo: https://github.com/PyCQA/pydocstyle
3738
rev: 6.1.1

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ docker-shell-os:
6363

6464
.PHONY: test-elasticsearch
6565
test-elasticsearch:
66-
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest'
66+
-$(run_es) /bin/bash -c 'pip install redis export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest'
6767
docker compose down
6868

6969
.PHONY: test-opensearch

compose.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ services:
2222
- ES_VERIFY_CERTS=false
2323
- BACKEND=elasticsearch
2424
- DATABASE_REFRESH=true
25+
- REDIS_HOST=redis
26+
- REDIS_PORT=6379
2527
ports:
2628
- "8080:8080"
2729
volumes:
@@ -30,6 +32,7 @@ services:
3032
- ./esdata:/usr/share/elasticsearch/data
3133
depends_on:
3234
- elasticsearch
35+
- redis
3336
command:
3437
bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app"
3538

@@ -94,3 +97,10 @@ services:
9497
- ./opensearch/snapshots:/usr/share/opensearch/snapshots
9598
ports:
9699
- "9202:9202"
100+
101+
redis:
102+
container_name: stac-redis
103+
image: redis:7.2-alpine
104+
restart: always
105+
ports:
106+
- "6379:6379"

docker-compose.redis.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
version: '3.8'
2+
3+
services:
4+
redis:
5+
image: redis:7-alpine
6+
container_name: stac-app-redis
7+
ports:
8+
- "6379:6379"
9+
volumes:
10+
- redis_data:/data
11+
command: >
12+
redis-server
13+
--appendonly yes
14+
--maxmemory 256mb
15+
--maxmemory-policy allkeys-lru
16+
healthcheck:
17+
test: ["CMD", "redis-cli", "ping"]
18+
interval: 10s
19+
timeout: 5s
20+
retries: 3
21+
restart: unless-stopped
22+
23+
volumes:
24+
redis_data:

stac_fastapi/core/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"pygeofilter~=0.3.1",
2020
"jsonschema~=4.0.0",
2121
"slowapi~=0.1.9",
22+
"redis==6.4.0",
2223
]
2324

2425
setup(

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 98 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from stac_fastapi.core.base_settings import ApiBaseSettings
2525
from stac_fastapi.core.datetime_utils import format_datetime_range
2626
from stac_fastapi.core.models.links import PagingLinks
27+
from stac_fastapi.core.redis_utils import close_redis, connect_redis
2728
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
2829
from stac_fastapi.core.session import Session
2930
from stac_fastapi.core.utilities import filter_fields
@@ -237,6 +238,18 @@ async def all_collections(self, **kwargs) -> stac_types.Collections:
237238
base_url = str(request.base_url)
238239
limit = int(request.query_params.get("limit", os.getenv("STAC_ITEM_LIMIT", 10)))
239240
token = request.query_params.get("token")
241+
current_url = str(request.url)
242+
redis = None
243+
try:
244+
redis = await connect_redis()
245+
246+
if redis:
247+
current_key = "current:collections"
248+
await redis.setex(current_key, 600, current_url)
249+
250+
except Exception as e:
251+
logger.error(f"Redis connection error: {e}")
252+
redis = None
240253

241254
collections, next_token = await self.database.get_all_collections(
242255
token=token, limit=limit, request=request
@@ -252,10 +265,29 @@ async def all_collections(self, **kwargs) -> stac_types.Collections:
252265
},
253266
]
254267

255-
if next_token:
268+
if token and redis:
269+
prev_key = "prev:collections"
270+
previous_url = await redis.get(prev_key)
271+
272+
if previous_url:
273+
if previous_url != current_url:
274+
links.append(
275+
{
276+
"rel": "previous",
277+
"type": MimeTypes.json,
278+
"href": previous_url,
279+
}
280+
)
281+
282+
if next_token and redis:
283+
prev_key = "prev:collections"
284+
await redis.setex(prev_key, 600, current_url)
285+
256286
next_link = PagingLinks(next=next_token, request=request).link_next()
257287
links.append(next_link)
258288

289+
if redis:
290+
await close_redis()
259291
return stac_types.Collections(collections=collections, links=links)
260292

261293
async def get_collection(
@@ -310,20 +342,23 @@ async def item_collection(
310342
"""
311343
request: Request = kwargs["request"]
312344
token = request.query_params.get("token")
313-
if not hasattr(self, '_prev_links'):
314-
self._prev_links = {}
315-
316-
session_id = request.cookies.get('stac_session', 'default_session')
317-
current_self_link = str(request.url)
318-
319-
if session_id not in self._prev_links:
320-
self._prev_links[session_id] = []
321-
322-
history = self._prev_links[session_id]
323-
if not history or current_self_link != history[-1]:
324-
history.append(current_self_link)
325345
base_url = str(request.base_url)
326346

347+
current_url = str(request.url)
348+
349+
try:
350+
redis = await connect_redis()
351+
except Exception as e:
352+
logger.error(f"Redis connection error: {e}")
353+
redis = None
354+
355+
if redis:
356+
try:
357+
current_key = f"current:{collection_id}"
358+
await redis.setex(current_key, 600, current_url)
359+
except Exception as e:
360+
logger.error(f"Redis error: {e}")
361+
327362
collection = await self.get_collection(
328363
collection_id=collection_id, request=request
329364
)
@@ -374,23 +409,37 @@ async def item_collection(
374409
"href": urljoin(str(request.base_url), f"collections/{collection_id}"),
375410
},
376411
{
377-
"rel": "parent",
412+
"rel": "parent",
378413
"type": "application/json",
379414
"href": urljoin(str(request.base_url), f"collections/{collection_id}"),
380-
}
415+
},
381416
]
382417

383418
paging_links = await PagingLinks(request=request, next=next_token).get_links()
384-
history = self._prev_links.get(session_id, [])
385-
if len(history) > 1:
386-
previous_self_link = history[-2]
387-
paging_links.append({
388-
"rel": "previous",
389-
"type": "application/json",
390-
"href": previous_self_link,
391-
})
419+
420+
if token and redis:
421+
prev_key = f"prev:{collection_id}"
422+
previous_url = await redis.get(prev_key)
423+
424+
if previous_url:
425+
# prevent looped navigation
426+
if previous_url != current_url:
427+
paging_links.append(
428+
{
429+
"rel": "previous",
430+
"type": "application/json",
431+
"href": previous_url,
432+
}
433+
)
434+
if redis and next_token:
435+
prev_key = f"prev:{collection_id}"
436+
await redis.setex(prev_key, 600, current_url)
437+
392438
links = collection_links + paging_links
393439

440+
if redis:
441+
await close_redis()
442+
394443
return stac_types.ItemCollection(
395444
type="FeatureCollection",
396445
features=items,
@@ -529,6 +578,12 @@ async def post_search(
529578
HTTPException: If there is an error with the cql2_json filter.
530579
"""
531580
base_url = str(request.base_url)
581+
current_url = str(request.url)
582+
try:
583+
redis = await connect_redis()
584+
except Exception as e:
585+
logger.error(f"Redis connection error: {e}")
586+
redis = None
532587

533588
search = self.database.make_search()
534589

@@ -628,6 +683,26 @@ async def post_search(
628683
]
629684
links = await PagingLinks(request=request, next=next_token).get_links()
630685

686+
if search_request.token and redis:
687+
prev_key = "prev:search_result"
688+
previous_url = await redis.get(prev_key)
689+
690+
if previous_url and previous_url != current_url:
691+
links.append(
692+
{
693+
"rel": "previous",
694+
"type": "application/json",
695+
"href": previous_url,
696+
}
697+
)
698+
699+
if redis and next_token:
700+
prev_key = "prev:search_result"
701+
await redis.setex(prev_key, 600, current_url)
702+
703+
if redis:
704+
await close_redis()
705+
631706
return stac_types.ItemCollection(
632707
type="FeatureCollection",
633708
features=items,
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Utilities for connecting to and managing Redis connections."""
2+
3+
import os
4+
from typing import List, Optional
5+
6+
from pydantic_settings import BaseSettings
7+
from redis import asyncio as aioredis
8+
9+
from stac_fastapi.core.utilities import get_bool_env
10+
11+
redis_pool = None
12+
13+
14+
class RedisSettings(BaseSettings):
15+
"""Configuration settings for connecting to a Redis server."""
16+
17+
sentinel_hosts: List[str] = os.getenv("REDIS_SENTINEL_HOSTS", "").split(",")
18+
sentinel_ports: List[int] = [
19+
int(port) for port in os.getenv("REDIS_SENTINEL_PORTS", "").split(",")
20+
]
21+
sentinel_master_name: str = os.getenv("REDIS_SENTINEL_MASTER_NAME", "")
22+
redis_db: int = int(os.getenv("REDIS_DB", "0"))
23+
24+
max_connections: int = int(os.getenv("REDIS_MAX_CONNECTIONS", "5"))
25+
retry_on_timeout: bool = get_bool_env("REDIS_RETRY_TIMEOUT", True)
26+
decode_responses: bool = get_bool_env("REDIS_DECODE_RESPONSES", True)
27+
client_name: str = os.getenv("REDIS_CLIENT_NAME", "stac-fastapi-app")
28+
health_check_interval: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30"))
29+
30+
31+
redis_settings = RedisSettings()
32+
33+
34+
async def connect_redis(settings: Optional[RedisSettings] = None) -> aioredis.Redis:
35+
"""Return a Redis connection using optional custom settings."""
36+
global redis_pool
37+
settings = redis_settings
38+
39+
if redis_pool is None:
40+
try:
41+
sentinel = aioredis.Sentinel(
42+
[
43+
(host, port)
44+
for host, port in zip(
45+
settings.sentinel_hosts, settings.sentinel_ports
46+
)
47+
],
48+
decode_responses=settings.decode_responses,
49+
retry_on_timeout=settings.retry_on_timeout,
50+
client_name=settings.client_name,
51+
)
52+
master = sentinel.master_for(
53+
settings.sentinel_master_name,
54+
db=settings.redis_db,
55+
decode_responses=settings.decode_responses,
56+
retry_on_timeout=settings.retry_on_timeout,
57+
client_name=settings.client_name,
58+
)
59+
redis_pool = master
60+
except Exception as e:
61+
print(f"Redis Sentinel connection error: {e}")
62+
return redis_pool
63+
64+
65+
async def close_redis() -> None:
66+
"""Close the Redis connection pool if it exists."""
67+
global redis_pool
68+
if redis_pool:
69+
await redis_pool.close()
70+
redis_pool = None

0 commit comments

Comments
 (0)