Skip to content
Merged
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
- Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87)
- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Changed

- Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326)
- Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182)
- Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75)
- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)

- Refactored CRUD methods in `TransactionsClient` to use the `_resolve_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Fixed

- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)
- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

## [v4.0.0] - 2025-04-23

Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ You can customize additional settings in your `.env` file:
| `RELOAD` | Enable auto-reload for development. | `true` | Optional |
| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional |
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional |
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
28 changes: 15 additions & 13 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,11 @@ async def create_item(
for feature in features
]
attempted = len(processed_items)

success, errors = await self.database.bulk_async(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
collection_id=collection_id,
processed_items=processed_items,
**kwargs,
)
if errors:
logger.error(
Expand All @@ -729,10 +730,7 @@ async def create_item(

# Handle single item
await self.database.create_item(
item_dict,
refresh=kwargs.get("refresh", False),
base_url=base_url,
exist_ok=False,
item_dict, base_url=base_url, exist_ok=False, **kwargs
)
return ItemSerializer.db_to_stac(item_dict, base_url)

Expand All @@ -757,11 +755,12 @@ async def update_item(
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)

now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
item["properties"]["updated"] = now

await self.database.create_item(
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
item, base_url=base_url, exist_ok=True, **kwargs
)

return ItemSerializer.db_to_stac(item, base_url)
Expand All @@ -777,7 +776,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
Returns:
None: Returns 204 No Content on successful deletion
"""
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
await self.database.delete_item(
item_id=item_id, collection_id=collection_id, **kwargs
)
return None

@overrides
Expand All @@ -798,8 +799,9 @@ async def create_collection(
"""
collection = collection.model_dump(mode="json")
request = kwargs["request"]

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.create_collection(collection=collection)
await self.database.create_collection(collection=collection, **kwargs)
return CollectionSerializer.db_to_stac(
collection,
request,
Expand Down Expand Up @@ -835,7 +837,7 @@ async def update_collection(

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.update_collection(
collection_id=collection_id, collection=collection
collection_id=collection_id, collection=collection, **kwargs
)

return CollectionSerializer.db_to_stac(
Expand All @@ -860,7 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None:
Raises:
NotFoundError: If the collection doesn't exist
"""
await self.database.delete_collection(collection_id=collection_id)
await self.database.delete_collection(collection_id=collection_id, **kwargs)
return None


Expand Down Expand Up @@ -937,7 +939,7 @@ def bulk_item_insert(
success, errors = self.database.bulk_sync(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
**kwargs,
)
if errors:
logger.error(f"Bulk sync operation encountered errors: {errors}")
Expand Down
27 changes: 27 additions & 0 deletions stac_fastapi/core/stac_fastapi/core/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ def get_bool_env(name: str, default: bool = False) -> bool:
return default


def resolve_refresh(refresh: str) -> str:
"""
Resolve the `refresh` parameter from kwargs or the environment variable.

Args:
refresh (str): The `refresh` parameter value.

Returns:
str: The resolved value of the `refresh` parameter, which can be "true", "false", or "wait_for".

Raises:
ValueError: If the `refresh` value is not one of "true", "false", or "wait_for".
"""
logger = logging.getLogger(__name__)

# Normalize and validate the `refresh` value
refresh = refresh.lower()
if refresh not in {"true", "false", "wait_for"}:
raise ValueError(
"Invalid value for `refresh`. Must be 'true', 'false', or 'wait_for'."
)

# Log the resolved value
logger.info(f"`refresh` parameter resolved to: {refresh}")
return refresh


def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]:
"""Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon.

Expand Down
38 changes: 37 additions & 1 deletion stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import ssl
from typing import Any, Dict, Set
from typing import Any, Dict, Set, Union

import certifi
from elasticsearch._async.client import AsyncElasticsearch
Expand Down Expand Up @@ -88,6 +88,24 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false").lower()
if value in {"true", "false"}:
return value == "true"
elif value == "wait_for":
return "wait_for"
else:
raise ValueError(
"Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'."
)

@property
def create_client(self):
"""Create es client."""
Expand All @@ -109,6 +127,24 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false").lower()
if value in {"true", "false"}:
return value == "true"
elif value == "wait_for":
return "wait_for"
else:
raise ValueError(
"Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'."
)

@property
def create_client(self):
"""Create async elasticsearch client."""
Expand Down
Loading