diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e867050b..f550c8cb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,7 +31,8 @@ repos: ] additional_dependencies: [ "types-attrs", - "types-requests" + "types-requests", + "types-redis" ] - repo: https://github.com/PyCQA/pydocstyle rev: 6.1.1 diff --git a/CHANGELOG.md b/CHANGELOG.md index cbd11060..6aa41e94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added Redis caching configuration for navigation pagination support, enabling proper `prev` and `next` links in paginated responses. [#466](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/466) + ### Changed ### Fixed diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..48349fc0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.13-slim + +RUN apt-get update && apt-get install -y \ + build-essential \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY README.md . +COPY stac_fastapi/opensearch/setup.py stac_fastapi/opensearch/ +COPY stac_fastapi/core/setup.py stac_fastapi/core/ +COPY stac_fastapi/sfeos_helpers/setup.py stac_fastapi/sfeos_helpers/ + + +RUN pip install --no-cache-dir --upgrade pip setuptools wheel + +COPY stac_fastapi/ stac_fastapi/ + +RUN pip install --no-cache-dir ./stac_fastapi/core +RUN pip install --no-cache-dir ./stac_fastapi/sfeos_helpers +RUN pip install --no-cache-dir ./stac_fastapi/opensearch[server] + +EXPOSE 8080 + +CMD ["uvicorn", "stac_fastapi.opensearch.app:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/README.md b/README.md index a8e2a297..c6b4b747 100644 --- a/README.md +++ b/README.md @@ -307,6 +307,29 @@ You can customize additional settings in your `.env` file: > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, `ES_VERIFY_CERTS` and `ES_TIMEOUT` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. +**Redis for Navigation:** +These Redis configuration variables enable proper navigation functionality in STAC FastAPI. The Redis cache stores navigation state for paginated results, allowing the system to maintain previous page links using tokens. The configuration supports either Redis Sentinel or Redis: + +| Variable | Description | Default | Required | +|------------------------------|--------------------------------------------------------------------------------------|--------------------------|---------------------------------------------------------------------------------------------| +| **Redis Sentinel** | | | | +| `REDIS_SENTINEL_HOSTS` | Comma-separated list of Redis Sentinel hostnames/IP addresses. | `""` | Conditional (required if using Sentinel) | +| `REDIS_SENTINEL_PORTS` | Comma-separated list of Redis Sentinel ports (must match order). | `"26379"` | Conditional (required if using Sentinel) | +| `REDIS_SENTINEL_MASTER_NAME` | Name of the Redis master node in Sentinel configuration. | `"master"` | Conditional (required if using Sentinel) | +| **Redis** | | | | +| `REDIS_HOST` | Redis server hostname or IP address for Redis configuration. | `""` | Conditional (required for standalone Redis) | +| `REDIS_PORT` | Redis server port for Redis configuration. | `6379` | Conditional (required for standalone Redis) | +| **Both** | | | | +| `REDIS_DB` | Redis database number to use for caching. | `0` (Sentinel) / `0` (Standalone) | Optional | +| `REDIS_MAX_CONNECTIONS` | Maximum number of connections in the Redis connection pool. | `10` | Optional | +| `REDIS_RETRY_TIMEOUT` | Enable retry on timeout for Redis operations. | `true` | Optional | +| `REDIS_DECODE_RESPONSES` | Automatically decode Redis responses to strings. | `true` | Optional | +| `REDIS_CLIENT_NAME` | Client name identifier for Redis connections. | `"stac-fastapi-app"` | Optional | +| `REDIS_HEALTH_CHECK_INTERVAL`| Interval in seconds for Redis health checks. | `30` | Optional | + +> [!NOTE] +> Use either the Sentinel configuration (`REDIS_SENTINEL_HOSTS`, `REDIS_SENTINEL_PORTS`, `REDIS_SENTINEL_MASTER_NAME`) OR the Redis configuration (`REDIS_HOST`, `REDIS_PORT`), but not both. + ## Datetime-Based Index Management ### Overview @@ -693,3 +716,4 @@ The system uses a precise naming convention: - Ensures fair resource allocation among all clients - **Examples**: Implementation examples are available in the [examples/rate_limit](examples/rate_limit) directory. + diff --git a/dockerfiles/Dockerfile.dev.es b/dockerfiles/Dockerfile.dev.es index 1e1ffbe4..b6b92523 100644 --- a/dockerfiles/Dockerfile.dev.es +++ b/dockerfiles/Dockerfile.dev.es @@ -18,3 +18,4 @@ COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server] +RUN pip install --no-cache-dir redis types-redis diff --git a/stac_fastapi/core/setup.py b/stac_fastapi/core/setup.py index 92442997..b055eecd 100644 --- a/stac_fastapi/core/setup.py +++ b/stac_fastapi/core/setup.py @@ -19,6 +19,7 @@ "pygeofilter~=0.3.1", "jsonschema~=4.0.0", "slowapi~=0.1.9", + "redis==6.4.0", ] setup( diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 143b4d5a..346e8672 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -24,6 +24,11 @@ from stac_fastapi.core.base_settings import ApiBaseSettings from stac_fastapi.core.datetime_utils import format_datetime_range from stac_fastapi.core.models.links import PagingLinks +from stac_fastapi.core.redis_utils import ( + connect_redis_sentinel, + get_prev_link, + save_self_link, +) from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.core.session import Session from stac_fastapi.core.utilities import filter_fields @@ -333,6 +338,13 @@ async def all_collections( if q is not None: q_list = [q] if isinstance(q, str) else q + current_url = str(request.url) + redis = None + try: + redis = await connect_redis_sentinel() + except Exception: + redis = None + # Parse the query parameter if provided parsed_query = None if query is not None: @@ -426,6 +438,22 @@ async def all_collections( }, ] + if redis: + if next_token: + await save_self_link(redis, next_token, current_url) + + prev_link = await get_prev_link(redis, token) + if prev_link: + links.insert( + 0, + { + "rel": "previous", + "type": "application/json", + "method": "GET", + "href": prev_link, + }, + ) + if next_token: next_link = PagingLinks(next=next_token, request=request).link_next() links.append(next_link) @@ -744,6 +772,10 @@ async def post_search( HTTPException: If there is an error with the cql2_json filter. """ base_url = str(request.base_url) + try: + redis = await connect_redis_sentinel() + except Exception: + redis = None search = self.database.make_search() @@ -850,6 +882,60 @@ async def post_search( ] links = await PagingLinks(request=request, next=next_token).get_links() + collection_links = [] + if ( + items + and search_request.collections + and len(search_request.collections) == 1 + ): + collection_id = search_request.collections[0] + collection_links.extend( + [ + { + "rel": "collection", + "type": "application/json", + "href": urljoin(base_url, f"collections/{collection_id}"), + }, + { + "rel": "parent", + "type": "application/json", + "href": urljoin(base_url, f"collections/{collection_id}"), + }, + ] + ) + links.extend(collection_links) + + if redis: + self_link = str(request.url) + await save_self_link(redis, next_token, self_link) + + prev_link = await get_prev_link(redis, token_param) + if prev_link: + method = "GET" + body = None + for link in links: + if link.get("rel") == "next": + if "method" in link: + method = link["method"] + if "body" in link: + body = {**link["body"]} + body.pop("token", None) + break + else: + method = request.method + + prev_link_data = { + "rel": "previous", + "type": "application/json", + "method": method, + "href": prev_link, + } + + if body: + prev_link_data["body"] = body + + links.insert(0, prev_link_data) + return stac_types.ItemCollection( type="FeatureCollection", features=items, diff --git a/stac_fastapi/core/stac_fastapi/core/redis_utils.py b/stac_fastapi/core/stac_fastapi/core/redis_utils.py new file mode 100644 index 00000000..f26c4cba --- /dev/null +++ b/stac_fastapi/core/stac_fastapi/core/redis_utils.py @@ -0,0 +1,123 @@ +"""Utilities for connecting to and managing Redis connections.""" + +from typing import Optional + +from pydantic_settings import BaseSettings +from redis import asyncio as aioredis +from redis.asyncio.sentinel import Sentinel + +redis_pool: Optional[aioredis.Redis] = None + + +class RedisSentinelSettings(BaseSettings): + """Configuration for connecting to Redis Sentinel.""" + + REDIS_SENTINEL_HOSTS: str = "" + REDIS_SENTINEL_PORTS: str = "26379" + REDIS_SENTINEL_MASTER_NAME: str = "master" + REDIS_DB: int = 0 + + REDIS_MAX_CONNECTIONS: int = 10 + REDIS_RETRY_TIMEOUT: bool = True + REDIS_DECODE_RESPONSES: bool = True + REDIS_CLIENT_NAME: str = "stac-fastapi-app" + REDIS_HEALTH_CHECK_INTERVAL: int = 30 + + +class RedisSettings(BaseSettings): + """Configuration for connecting Redis.""" + + REDIS_HOST: str = "" + REDIS_PORT: int = 6379 + REDIS_DB: int = 0 + + REDIS_MAX_CONNECTIONS: int = 10 + REDIS_RETRY_TIMEOUT: bool = True + REDIS_DECODE_RESPONSES: bool = True + REDIS_CLIENT_NAME: str = "stac-fastapi-app" + REDIS_HEALTH_CHECK_INTERVAL: int = 30 + + +# Select the Redis or Redis Sentinel configuration +redis_settings: BaseSettings = RedisSentinelSettings() + + +async def connect_redis_sentinel( + settings: Optional[RedisSentinelSettings] = None, +) -> Optional[aioredis.Redis]: + """Return Redis Sentinel connection.""" + global redis_pool + settings = settings or redis_settings + + if ( + not settings.REDIS_SENTINEL_HOSTS + or not settings.REDIS_SENTINEL_PORTS + or not settings.REDIS_SENTINEL_MASTER_NAME + ): + return None + + hosts = [h.strip() for h in settings.REDIS_SENTINEL_HOSTS.split(",") if h.strip()] + ports = [ + int(p.strip()) for p in settings.REDIS_SENTINEL_PORTS.split(",") if p.strip() + ] + + if redis_pool is None: + try: + sentinel = Sentinel( + [(host, port) for host, port in zip(hosts, ports)], + decode_responses=settings.REDIS_DECODE_RESPONSES, + ) + master = sentinel.master_for( + service_name=settings.REDIS_SENTINEL_MASTER_NAME, + db=settings.REDIS_DB, + decode_responses=settings.REDIS_DECODE_RESPONSES, + retry_on_timeout=settings.REDIS_RETRY_TIMEOUT, + client_name=settings.REDIS_CLIENT_NAME, + max_connections=settings.REDIS_MAX_CONNECTIONS, + health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL, + ) + redis_pool = master + + except Exception: + return None + + return redis_pool + + +async def connect_redis(settings: Optional[RedisSettings] = None) -> aioredis.Redis: + """Return Redis connection.""" + global redis_pool + settings = settings or redis_settings + + if not settings.REDIS_HOST or not settings.REDIS_PORT: + return None + + if redis_pool is None: + pool = aioredis.ConnectionPool( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=settings.REDIS_DB, + max_connections=settings.REDIS_MAX_CONNECTIONS, + decode_responses=settings.REDIS_DECODE_RESPONSES, + retry_on_timeout=settings.REDIS_RETRY_TIMEOUT, + health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL, + ) + redis_pool = aioredis.Redis( + connection_pool=pool, client_name=settings.REDIS_CLIENT_NAME + ) + return redis_pool + + +async def save_self_link( + redis: aioredis.Redis, token: Optional[str], self_href: str +) -> None: + """Add the self link for next page as prev link for the current token.""" + if token: + await redis.setex(f"nav:self:{token}", 1800, self_href) + + +async def get_prev_link(redis: aioredis.Redis, token: Optional[str]) -> Optional[str]: + """Pull the prev page link for the current token.""" + if not token: + return None + return await redis.get(f"nav:self:{token}") diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 1751df78..37af2c94 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -11,6 +11,7 @@ "elasticsearch[async]~=8.18.0", "uvicorn~=0.23.0", "starlette>=0.35.0,<0.36.0", + "redis==6.4.0", ] extra_reqs = { diff --git a/stac_fastapi/opensearch/setup.py b/stac_fastapi/opensearch/setup.py index d7727267..a3a4b6d1 100644 --- a/stac_fastapi/opensearch/setup.py +++ b/stac_fastapi/opensearch/setup.py @@ -12,6 +12,7 @@ "opensearch-py[async]~=2.8.0", "uvicorn~=0.23.0", "starlette>=0.35.0,<0.36.0", + "redis==6.4.0", ] extra_reqs = { diff --git a/stac_fastapi/sfeos_helpers/setup.py b/stac_fastapi/sfeos_helpers/setup.py index e7cdd84c..f26aba24 100644 --- a/stac_fastapi/sfeos_helpers/setup.py +++ b/stac_fastapi/sfeos_helpers/setup.py @@ -6,9 +6,9 @@ desc = f.read() install_requires = [ + "redis==6.4.0", "stac-fastapi.core==6.5.1", ] - setup( name="sfeos_helpers", description="Helper library for the Elasticsearch and Opensearch stac-fastapi backends.", diff --git a/stac_fastapi/tests/api/test_api.py b/stac_fastapi/tests/api/test_api.py index 6fdc2fb6..018bd82c 100644 --- a/stac_fastapi/tests/api/test_api.py +++ b/stac_fastapi/tests/api/test_api.py @@ -3,10 +3,18 @@ import uuid from copy import deepcopy from datetime import datetime, timedelta -from unittest.mock import patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest +from stac_fastapi.core.redis_utils import ( + RedisSentinelSettings, + RedisSettings, + connect_redis, + connect_redis_sentinel, + get_prev_link, + save_self_link, +) from stac_fastapi.types.errors import ConflictError from ..conftest import create_collection, create_item @@ -1625,3 +1633,126 @@ async def test_use_datetime_false(app_client, load_test_data, txn_client, monkey assert "test-item-datetime-only" not in found_ids assert "test-item-start-end-only" in found_ids + + +@pytest.mark.asyncio +async def test_connect_redis(): + from stac_fastapi.core import redis_utils + + redis_utils.redis_pool = None + + test_settings = RedisSettings( + REDIS_HOST="test-redis-host", + REDIS_PORT=6380, + REDIS_DB=5, + REDIS_MAX_CONNECTIONS=20, + REDIS_RETRY_TIMEOUT=False, + REDIS_DECODE_RESPONSES=False, + REDIS_CLIENT_NAME="custom-client", + REDIS_HEALTH_CHECK_INTERVAL=50, + ) + + with patch( + "stac_fastapi.core.redis_utils.aioredis.ConnectionPool" + ) as mock_pool_class, patch( + "stac_fastapi.core.redis_utils.aioredis.Redis" + ) as mock_redis_class: + + mock_pool_instance = AsyncMock() + mock_redis_instance = AsyncMock() + mock_pool_class.return_value = mock_pool_instance + mock_redis_class.return_value = mock_redis_instance + + result = await connect_redis(test_settings) + + mock_pool_class.assert_called_once_with( + host="test-redis-host", + port=6380, + db=5, + max_connections=20, + decode_responses=False, + retry_on_timeout=False, + health_check_interval=50, + ) + + mock_redis_class.assert_called_once_with( + connection_pool=mock_pool_instance, client_name="custom-client" + ) + + assert result == mock_redis_instance + + +@pytest.mark.asyncio +async def test_connect_redis_sentinel(monkeypatch): + from stac_fastapi.core import redis_utils + + redis_utils.redis_pool = None + + master_mock = AsyncMock() + + sentinel_mock = MagicMock() + sentinel_mock.master_for.return_value = master_mock + + with patch("stac_fastapi.core.redis_utils.Sentinel") as mock_sentinel_class: + mock_sentinel_class.return_value = sentinel_mock + + settings = RedisSentinelSettings( + REDIS_SENTINEL_HOSTS="test-redis-sentinel-host", + REDIS_SENTINEL_PORTS="26379", + REDIS_SENTINEL_MASTER_NAME="master", + REDIS_DB=15, + REDIS_MAX_CONNECTIONS=20, + REDIS_RETRY_TIMEOUT=False, + REDIS_DECODE_RESPONSES=False, + REDIS_CLIENT_NAME="custom-client", + REDIS_HEALTH_CHECK_INTERVAL=50, + ) + + redis = await connect_redis_sentinel(settings) + + mock_sentinel_class.assert_called_once_with( + [("test-redis-sentinel-host", 26379)], + decode_responses=False, + ) + + sentinel_mock.master_for.assert_called_once_with( + service_name="master", + db=15, + decode_responses=False, + retry_on_timeout=False, + client_name="custom-client", + max_connections=20, + health_check_interval=50, + ) + + assert redis is master_mock + assert redis_utils.redis_pool is master_mock + + +@pytest.mark.asyncio +async def test_save_and_get_prev_link(): + mock_redis = AsyncMock() + + await save_self_link(mock_redis, "dummy_token", "http://mywebsite.com/page1") + mock_redis.setex.assert_awaited_once_with( + "nav:self:dummy_token", 1800, "http://mywebsite.com/page1" + ) + + mock_redis.reset_mock() + mock_redis.get.return_value = "http://mywebsite.com/page1" + + result = await get_prev_link(mock_redis, "dummy_token") + assert result == "http://mywebsite.com/page1" + mock_redis.get.assert_awaited_once_with("nav:self:dummy_token") + + mock_redis.reset_mock() + result_none = await get_prev_link(mock_redis, None) + assert result_none is None + mock_redis.get.assert_not_called() + + mock_redis.reset_mock() + mock_redis.get.return_value = None + + result_missing = await get_prev_link(mock_redis, "dummy_token_2") + assert result_missing is None + mock_redis.get.assert_awaited_once_with("nav:self:dummy_token_2")