Skip to content

Commit ae1071e

Browse files
committed
bulk raise on error env var
1 parent 870e291 commit ae1071e

File tree

4 files changed

+39
-11
lines changed

4 files changed

+39
-11
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ You can customize additional settings in your `.env` file:
113113
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
114114
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
115115
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
116-
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional |
116+
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
117+
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional |
117118

118119
> [!NOTE]
119120
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Core client."""
22

33
import logging
4+
import os
45
from collections import deque
56
from datetime import datetime as datetime_type
67
from datetime import timezone
@@ -709,7 +710,10 @@ async def create_item(
709710
]
710711
attempted = len(processed_items)
711712
success, errors = await self.database.bulk_async(
712-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
713+
collection_id,
714+
processed_items,
715+
refresh=kwargs.get("refresh", False),
716+
raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False),
713717
)
714718
if errors:
715719
logger.error(f"Bulk async operation encountered errors: {errors}")
@@ -914,7 +918,10 @@ def bulk_item_insert(
914918
collection_id = processed_items[0]["collection"]
915919
attempted = len(processed_items)
916920
success, errors = self.database.bulk_sync(
917-
collection_id, processed_items, refresh=kwargs.get("refresh", False)
921+
collection_id,
922+
processed_items,
923+
refresh=kwargs.get("refresh", False),
924+
raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False),
918925
)
919926
if errors:
920927
logger.error(f"Bulk sync operation encountered errors: {errors}")

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,11 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
985985
await delete_item_index(collection_id)
986986

987987
async def bulk_async(
988-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
988+
self,
989+
collection_id: str,
990+
processed_items: List[Item],
991+
refresh: bool = False,
992+
raise_on_error: bool = False,
989993
) -> Tuple[int, List[Dict[str, Any]]]:
990994
"""
991995
Perform a bulk insert of items into the database asynchronously.
@@ -994,6 +998,7 @@ async def bulk_async(
994998
collection_id (str): The ID of the collection to which the items belong.
995999
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
9961000
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1001+
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
9971002
9981003
Returns:
9991004
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1010,12 +1015,16 @@ async def bulk_async(
10101015
self.client,
10111016
mk_actions(collection_id, processed_items),
10121017
refresh=refresh,
1013-
raise_on_error=False, # Do not raise errors
1018+
raise_on_error=raise_on_error,
10141019
)
10151020
return success, errors
10161021

10171022
def bulk_sync(
1018-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1023+
self,
1024+
collection_id: str,
1025+
processed_items: List[Item],
1026+
refresh: bool = False,
1027+
raise_on_error: bool = False,
10191028
) -> Tuple[int, List[Dict[str, Any]]]:
10201029
"""
10211030
Perform a bulk insert of items into the database synchronously.
@@ -1024,6 +1033,7 @@ def bulk_sync(
10241033
collection_id (str): The ID of the collection to which the items belong.
10251034
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
10261035
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1036+
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
10271037
10281038
Returns:
10291039
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1040,7 +1050,7 @@ def bulk_sync(
10401050
self.sync_client,
10411051
mk_actions(collection_id, processed_items),
10421052
refresh=refresh,
1043-
raise_on_error=False, # Do not raise errors
1053+
raise_on_error=raise_on_error,
10441054
)
10451055
return success, errors
10461056

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,11 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
983983
await delete_item_index(collection_id)
984984

985985
async def bulk_async(
986-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
986+
self,
987+
collection_id: str,
988+
processed_items: List[Item],
989+
refresh: bool = False,
990+
raise_on_error: bool = False,
987991
) -> Tuple[int, List[Dict[str, Any]]]:
988992
"""
989993
Perform a bulk insert of items into the database asynchronously.
@@ -992,6 +996,7 @@ async def bulk_async(
992996
collection_id (str): The ID of the collection to which the items belong.
993997
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
994998
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
999+
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
9951000
9961001
Returns:
9971002
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1008,12 +1013,16 @@ async def bulk_async(
10081013
self.client,
10091014
mk_actions(collection_id, processed_items),
10101015
refresh=refresh,
1011-
raise_on_error=False, # Do not raise errors
1016+
raise_on_error=raise_on_error,
10121017
)
10131018
return success, errors
10141019

10151020
def bulk_sync(
1016-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1021+
self,
1022+
collection_id: str,
1023+
processed_items: List[Item],
1024+
refresh: bool = False,
1025+
raise_on_error: bool = False,
10171026
) -> Tuple[int, List[Dict[str, Any]]]:
10181027
"""
10191028
Perform a bulk insert of items into the database synchronously.
@@ -1022,6 +1031,7 @@ def bulk_sync(
10221031
collection_id (str): The ID of the collection to which the items belong.
10231032
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
10241033
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1034+
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
10251035
10261036
Returns:
10271037
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1038,7 +1048,7 @@ def bulk_sync(
10381048
self.sync_client,
10391049
mk_actions(collection_id, processed_items),
10401050
refresh=refresh,
1041-
raise_on_error=False, # Do not raise errors
1051+
raise_on_error=raise_on_error,
10421052
)
10431053
return success, errors
10441054

0 commit comments

Comments
 (0)