Skip to content

Commit 7a48e18

Browse files
authored
Merge branch 'main' into release-v4.1.0
2 parents 5ed542d + b8d6c38 commit 7a48e18

File tree

10 files changed

+551
-205
lines changed

10 files changed

+551
-205
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1717

1818
### Added
1919

20+
- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
21+
- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
2022
- Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87)
2123

2224
### Changed
2325

2426
- Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326)
2527
- Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182)
2628
- Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75)
29+
- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)
30+
31+
### Fixed
32+
33+
- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)
2734

2835
## [v4.0.0] - 2025-04-23
2936

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ You can customize additional settings in your `.env` file:
113113
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
114114
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
115115
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
116-
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional |
116+
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
117+
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
117118

118119
> [!NOTE]
119120
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -676,46 +676,65 @@ class TransactionsClient(AsyncBaseTransactionsClient):
676676
@overrides
677677
async def create_item(
678678
self, collection_id: str, item: Union[Item, ItemCollection], **kwargs
679-
) -> Optional[stac_types.Item]:
680-
"""Create an item in the collection.
679+
) -> Union[stac_types.Item, str]:
680+
"""
681+
Create an item or a feature collection of items in the specified collection.
681682
682683
Args:
683-
collection_id (str): The id of the collection to add the item to.
684-
item (stac_types.Item): The item to be added to the collection.
685-
kwargs: Additional keyword arguments.
684+
collection_id (str): The ID of the collection to add the item(s) to.
685+
item (Union[Item, ItemCollection]): A single item or a collection of items to be added.
686+
**kwargs: Additional keyword arguments, such as `request` and `refresh`.
686687
687688
Returns:
688-
stac_types.Item: The created item.
689+
Union[stac_types.Item, str]: The created item if a single item is added, or a summary string
690+
indicating the number of items successfully added and errors if a collection of items is added.
689691
690692
Raises:
691-
NotFound: If the specified collection is not found in the database.
692-
ConflictError: If the item in the specified collection already exists.
693-
693+
NotFoundError: If the specified collection is not found in the database.
694+
ConflictError: If an item with the same ID already exists in the collection.
694695
"""
695-
item = item.model_dump(mode="json")
696-
base_url = str(kwargs["request"].base_url)
696+
request = kwargs.get("request")
697+
base_url = str(request.base_url)
697698

698-
# If a feature collection is posted
699-
if item["type"] == "FeatureCollection":
699+
# Convert Pydantic model to dict for uniform processing
700+
item_dict = item.model_dump(mode="json")
701+
702+
# Handle FeatureCollection (bulk insert)
703+
if item_dict["type"] == "FeatureCollection":
700704
bulk_client = BulkTransactionsClient(
701705
database=self.database, settings=self.settings
702706
)
707+
features = item_dict["features"]
703708
processed_items = [
704709
bulk_client.preprocess_item(
705-
item, base_url, BulkTransactionMethod.INSERT
710+
feature, base_url, BulkTransactionMethod.INSERT
706711
)
707-
for item in item["features"]
712+
for feature in features
708713
]
709-
710-
await self.database.bulk_async(
711-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
714+
attempted = len(processed_items)
715+
success, errors = await self.database.bulk_async(
716+
collection_id,
717+
processed_items,
718+
refresh=kwargs.get("refresh", False),
712719
)
720+
if errors:
721+
logger.error(
722+
f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})"
723+
)
724+
else:
725+
logger.info(
726+
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
727+
)
728+
return f"Successfully added {success} Items. {attempted - success} errors occurred."
713729

714-
return None
715-
else:
716-
item = await self.database.prep_create_item(item=item, base_url=base_url)
717-
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
718-
return ItemSerializer.db_to_stac(item, base_url)
730+
# Handle single item
731+
await self.database.create_item(
732+
item_dict,
733+
refresh=kwargs.get("refresh", False),
734+
base_url=base_url,
735+
exist_ok=False,
736+
)
737+
return ItemSerializer.db_to_stac(item_dict, base_url)
719738

720739
@overrides
721740
async def update_item(
@@ -741,8 +760,9 @@ async def update_item(
741760
now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
742761
item["properties"]["updated"] = now
743762

744-
await self.database.check_collection_exists(collection_id)
745-
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
763+
await self.database.create_item(
764+
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
765+
)
746766

747767
return ItemSerializer.db_to_stac(item, base_url)
748768

@@ -875,7 +895,7 @@ def preprocess_item(
875895
The preprocessed item.
876896
"""
877897
exist_ok = method == BulkTransactionMethod.UPSERT
878-
return self.database.sync_prep_create_item(
898+
return self.database.bulk_sync_prep_create_item(
879899
item=item, base_url=base_url, exist_ok=exist_ok
880900
)
881901

@@ -899,19 +919,32 @@ def bulk_item_insert(
899919
else:
900920
base_url = ""
901921

902-
processed_items = [
903-
self.preprocess_item(item, base_url, items.method)
904-
for item in items.items.values()
905-
]
922+
processed_items = []
923+
for item in items.items.values():
924+
try:
925+
validated = Item(**item) if not isinstance(item, Item) else item
926+
processed_items.append(
927+
self.preprocess_item(
928+
validated.model_dump(mode="json"), base_url, items.method
929+
)
930+
)
931+
except ValidationError:
932+
# Immediately raise on the first invalid item (strict mode)
933+
raise
906934

907-
# not a great way to get the collection_id-- should be part of the method signature
908935
collection_id = processed_items[0]["collection"]
909-
910-
self.database.bulk_sync(
911-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
936+
attempted = len(processed_items)
937+
success, errors = self.database.bulk_sync(
938+
collection_id,
939+
processed_items,
940+
refresh=kwargs.get("refresh", False),
912941
)
942+
if errors:
943+
logger.error(f"Bulk sync operation encountered errors: {errors}")
944+
else:
945+
logger.info(f"Bulk sync operation succeeded with {success} actions.")
913946

914-
return f"Successfully added {len(processed_items)} Items."
947+
return f"Successfully added/updated {success} Items. {attempted - success} errors occurred."
915948

916949

917950
_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = {

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
8686
indexed_fields: Set[str] = {"datetime"}
8787
enable_response_models: bool = False
8888
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
89+
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
8990

9091
@property
9192
def create_client(self):
@@ -106,6 +107,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
106107
indexed_fields: Set[str] = {"datetime"}
107108
enable_response_models: bool = False
108109
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
110+
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
109111

110112
@property
111113
def create_client(self):

0 commit comments

Comments
 (0)