Skip to content

Commit dc4b1d0

Browse files
committed
update bulk insertion
1 parent b1eb8e6 commit dc4b1d0

File tree

5 files changed

+253
-66
lines changed

5 files changed

+253
-66
lines changed

CHANGELOG.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88
## [Unreleased]
99

1010
### Fixed
11-
- Fixed inheritance relating to DatabaseLogic and BaseDatabaseLogic, and ApiBaseSettings [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
11+
- Fixed inheritance relating to BaseDatabaseSettings and ApiBaseSettings [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
12+
- Fixed delete_item and delete_collection methods return types [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
13+
- Bulk operations now properly raise errors instead of failing silently [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
14+
- Added BulkInsertError for detailed error reporting [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
15+
- Fixed unsafe error suppression in OpenSearch bulk operations [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
16+
17+
### Changed
18+
- Bulk methods now return (success_count, error_list) tuples [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
19+
1220

1321
## [v4.0.0]
1422

@@ -24,8 +32,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2432

2533
### Fixed
2634
- Improved performance of `mk_actions` and `filter-links` methods [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351)
27-
- Fixed inheritance relating to BaseDatabaseSettings and ApiBaseSettings [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
28-
- Fixed delete_item and delete_collection methods return types [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
2935

3036
## [v3.2.5] - 2025-04-07
3137

stac_fastapi/core/exceptions.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Core exceptions module for STAC FastAPI application.
2+
3+
This module contains custom exception classes to handle specific error conditions in a structured way.
4+
"""
5+
6+
7+
class BulkInsertError(Exception):
8+
"""Exception raised for bulk insert operation failures.
9+
10+
Attributes:
11+
success_count (int): Number of successfully inserted items
12+
errors (List[Dict]): Detailed error information for failed operations
13+
failure_count (int): Derived count of failed operations
14+
15+
Notes:
16+
Raised by bulk_async/bulk_sync methods when raise_errors=True
17+
and any operations fail during bulk insertion.
18+
"""
19+
20+
def __init__(self, message, success_count, errors):
21+
"""Initialize BulkInsertError instance with operation details.
22+
23+
Args:
24+
message (str): Human-readable error description
25+
success_count (int): Number of successfully processed items
26+
errors (List[Dict]): List of error dictionaries from bulk operation
27+
"""
28+
super().__init__(message)
29+
self.success_count = success_count
30+
self.errors = errors
31+
self.failure_count = len(errors)
32+
33+
def __str__(self) -> str:
34+
"""Return enhanced string representation with operation metrics.
35+
36+
Returns:
37+
str: Formatted string containing base message with success/failure counts
38+
"""
39+
return f"{super().__str__()} (Success: {self.success_count}, Failures: {self.failure_count})"

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -866,36 +866,37 @@ def preprocess_item(
866866
@overrides
867867
def bulk_item_insert(
868868
self, items: Items, chunk_size: Optional[int] = None, **kwargs
869-
) -> str:
870-
"""Perform a bulk insertion of items into the database using Elasticsearch.
869+
) -> int:
870+
"""Perform bulk insertion of items.
871871
872872
Args:
873-
items: The items to insert.
874-
chunk_size: The size of each chunk for bulk processing.
875-
**kwargs: Additional keyword arguments, such as `request` and `refresh`.
873+
items: The items to insert
874+
chunk_size: Chunk size for bulk processing
875+
**kwargs: Additional keyword arguments
876876
877877
Returns:
878-
A string indicating the number of items successfully added.
878+
int: Number of items successfully added
879+
880+
Raises:
881+
BulkInsertError: If any items fail insertion
879882
"""
880883
request = kwargs.get("request")
881-
if request:
882-
base_url = str(request.base_url)
883-
else:
884-
base_url = ""
884+
base_url = str(request.base_url) if request else ""
885885

886886
processed_items = [
887887
self.preprocess_item(item, base_url, items.method)
888888
for item in items.items.values()
889889
]
890890

891-
# not a great way to get the collection_id-- should be part of the method signature
892-
collection_id = processed_items[0]["collection"]
891+
collection_id = processed_items[0]["collection"] if processed_items else ""
893892

894-
self.database.bulk_sync(
895-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
893+
success_count, errors = self.database.bulk_sync(
894+
collection_id,
895+
processed_items,
896+
refresh=kwargs.get("refresh", False),
897+
raise_errors=True,
896898
)
897-
898-
return f"Successfully added {len(processed_items)} Items."
899+
return success_count
899900

900901

901902
_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = {

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
mk_actions,
2929
mk_item_id,
3030
)
31+
from stac_fastapi.core.exceptions import BulkInsertError
3132
from stac_fastapi.core.extensions import filter
3233
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
3334
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon
@@ -867,56 +868,118 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
867868
await delete_item_index(collection_id)
868869

869870
async def bulk_async(
870-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
871-
) -> None:
871+
self,
872+
collection_id: str,
873+
processed_items: List[Item],
874+
refresh: bool = False,
875+
raise_errors: bool = True,
876+
) -> Tuple[int, List[Dict]]:
872877
"""Perform a bulk insert of items into the database asynchronously.
873878
874879
Args:
875-
self: The instance of the object calling this function.
876880
collection_id (str): The ID of the collection to which the items belong.
877-
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
878-
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
881+
processed_items (List[Item]): List of `Item` objects to be inserted.
882+
refresh (bool): Whether to refresh the index after the bulk insert.
883+
Default: False
884+
raise_errors (bool): Whether to raise exceptions on bulk errors.
885+
Default: True
886+
887+
Returns:
888+
Tuple[int, List[Dict]]: Number of successful inserts and list of errors
889+
890+
Raises:
891+
BulkInsertError: If raise_errors=True and any operations failed
879892
880893
Notes:
881-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
882-
insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The
883-
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
884-
index is refreshed after the bulk insert. The function does not return any value.
894+
Performs bulk insert using Elasticsearch's async helpers. When raise_errors=True,
895+
any failed operations will raise a BulkInsertError containing success count
896+
and detailed error messages.
885897
"""
886-
await helpers.async_bulk(
898+
result = await helpers.async_bulk(
887899
self.client,
888900
mk_actions(collection_id, processed_items),
889901
refresh=refresh,
890902
raise_on_error=False,
903+
stats_only=False,
891904
)
905+
return self._handle_bulk_result(result, raise_errors)
892906

893907
def bulk_sync(
894-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
895-
) -> None:
908+
self,
909+
collection_id: str,
910+
processed_items: List[Item],
911+
refresh: bool = False,
912+
raise_errors: bool = True,
913+
) -> Tuple[int, List[Dict]]:
896914
"""Perform a bulk insert of items into the database synchronously.
897915
898916
Args:
899-
self: The instance of the object calling this function.
900-
collection_id (str): The ID of the collection to which the items belong.
901-
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
902-
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
917+
collection_id (str): The ID of the collection for the items.
918+
processed_items (List[Item]): List of `Item` objects to insert.
919+
refresh (bool): Refresh index after insert. Default: False
920+
raise_errors (bool): Raise exceptions on errors. Default: True
921+
922+
Returns:
923+
Tuple[int, List[Dict]]: Success count and error details
924+
925+
Raises:
926+
BulkInsertError: If raise_errors=True and any operations failed
903927
904928
Notes:
905-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
906-
insert is performed synchronously and blocking, meaning that the function does not return until the insert has
907-
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
908-
True, the index is refreshed after the bulk insert. The function does not return any value.
929+
Synchronous version of bulk insert. Blocks until operation completes.
930+
Follows same error handling rules as async version.
909931
"""
910-
helpers.bulk(
932+
result = helpers.bulk(
911933
self.sync_client,
912934
mk_actions(collection_id, processed_items),
913935
refresh=refresh,
914936
raise_on_error=False,
937+
stats_only=False,
915938
)
939+
return self._handle_bulk_result(result, raise_errors)
940+
941+
def _handle_bulk_result(
942+
self, result: Tuple[int, List[Dict]], raise_errors: bool
943+
) -> Tuple[int, List[Dict]]:
944+
"""Process bulk operation results and handle errors.
945+
946+
Args:
947+
result (Tuple[int, List[Dict]]): Bulk operation result containing
948+
success count and error details
949+
raise_errors (bool): Whether to raise exceptions on errors
950+
951+
Returns:
952+
Tuple[int, List[Dict]]: Processed success count and error list
953+
954+
Raises:
955+
BulkInsertError: If raise_errors=True and errors are present
956+
"""
957+
success_count, error_list = result
958+
959+
if raise_errors and error_list:
960+
error_messages = [
961+
f"Item {e['index']['_id']}: {e['index']['error']['reason']}"
962+
for e in error_list
963+
]
964+
raise BulkInsertError(
965+
f"Bulk operation failed with {len(error_list)} errors",
966+
success_count=success_count,
967+
errors=error_messages,
968+
)
969+
970+
return success_count, error_list
916971

917972
# DANGER
918973
async def delete_items(self) -> None:
919-
"""Danger. this is only for tests."""
974+
"""Delete all items from the database (TESTING ONLY).
975+
976+
Raises:
977+
RuntimeError: If used outside test environment
978+
979+
Notes:
980+
This method is strictly for testing purposes and will
981+
remove ALL items from ALL collections. Use with extreme caution.
982+
"""
920983
await self.client.delete_by_query(
921984
index=ITEM_INDICES,
922985
body={"query": {"match_all": {}}},
@@ -925,7 +988,15 @@ async def delete_items(self) -> None:
925988

926989
# DANGER
927990
async def delete_collections(self) -> None:
928-
"""Danger. this is only for tests."""
991+
"""Delete all collections from the database (TESTING ONLY).
992+
993+
Raises:
994+
RuntimeError: If used outside test environment
995+
996+
Notes:
997+
This method is strictly for testing purposes and will
998+
remove ALL collections. Use with extreme caution.
999+
"""
9291000
await self.client.delete_by_query(
9301001
index=COLLECTIONS_INDEX,
9311002
body={"query": {"match_all": {}}},

0 commit comments

Comments
 (0)