Skip to content

Commit 345a0af

Browse files
committed
extend raise on/ off
1 parent ae1071e commit 345a0af

File tree

3 files changed

+143
-47
lines changed

3 files changed

+143
-47
lines changed

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Core client."""
22

33
import logging
4-
import os
54
from collections import deque
65
from datetime import datetime as datetime_type
76
from datetime import timezone
@@ -713,7 +712,6 @@ async def create_item(
713712
collection_id,
714713
processed_items,
715714
refresh=kwargs.get("refresh", False),
716-
raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False),
717715
)
718716
if errors:
719717
logger.error(f"Bulk async operation encountered errors: {errors}")
@@ -722,7 +720,9 @@ async def create_item(
722720

723721
return f"Successfully added {success} Items. {attempted - success} errors occurred."
724722
else:
725-
item = await self.database.prep_create_item(item=item, base_url=base_url)
723+
item = await self.database.async_prep_create_item(
724+
item=item, base_url=base_url
725+
)
726726
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
727727
return ItemSerializer.db_to_stac(item, base_url)
728728

@@ -885,7 +885,7 @@ def preprocess_item(
885885
The preprocessed item.
886886
"""
887887
exist_ok = method == BulkTransactionMethod.UPSERT
888-
return self.database.sync_prep_create_item(
888+
return self.database.bulk_sync_prep_create_item(
889889
item=item, base_url=base_url, exist_ok=exist_ok
890890
)
891891

@@ -921,7 +921,6 @@ def bulk_item_insert(
921921
collection_id,
922922
processed_items,
923923
refresh=kwargs.get("refresh", False),
924-
raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False),
925924
)
926925
if errors:
927926
logger.error(f"Bulk sync operation encountered errors: {errors}")

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
)
3232
from stac_fastapi.core.extensions import filter
3333
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
34-
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon
34+
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env
3535
from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings
3636
from stac_fastapi.elasticsearch.config import (
3737
ElasticsearchSettings as SyncElasticsearchSettings,
@@ -699,7 +699,37 @@ async def check_collection_exists(self, collection_id: str):
699699
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
700700
raise NotFoundError(f"Collection {collection_id} does not exist")
701701

702-
async def prep_create_item(
702+
async def async_prep_create_item(
703+
self, item: Item, base_url: str, exist_ok: bool = False
704+
) -> Item:
705+
"""
706+
Preps an item for insertion into the database.
707+
708+
Args:
709+
item (Item): The item to be prepped for insertion.
710+
base_url (str): The base URL used to create the item's self URL.
711+
exist_ok (bool): Indicates whether the item can exist already.
712+
713+
Returns:
714+
Item: The prepped item.
715+
716+
Raises:
717+
ConflictError: If the item already exists in the database.
718+
719+
"""
720+
await self.check_collection_exists(collection_id=item["collection"])
721+
722+
if not exist_ok and await self.client.exists(
723+
index=index_alias_by_collection_id(item["collection"]),
724+
id=mk_item_id(item["id"], item["collection"]),
725+
):
726+
raise ConflictError(
727+
f"Item {item['id']} in collection {item['collection']} already exists"
728+
)
729+
730+
return self.item_serializer.stac_to_db(item, base_url)
731+
732+
async def bulk_async_prep_create_item(
703733
self, item: Item, base_url: str, exist_ok: bool = False
704734
) -> Item:
705735
"""
@@ -721,7 +751,8 @@ async def prep_create_item(
721751
722752
Raises:
723753
NotFoundError: If the collection that the item belongs to does not exist in the database.
724-
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False.
754+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
755+
and `RAISE_ON_BULK_ERROR` is set to `true`.
725756
"""
726757
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
727758

@@ -733,19 +764,22 @@ async def prep_create_item(
733764
index=index_alias_by_collection_id(item["collection"]),
734765
id=mk_item_id(item["id"], item["collection"]),
735766
):
736-
logger.warning(
767+
error_message = (
737768
f"Item {item['id']} in collection {item['collection']} already exists."
738769
)
739-
raise ConflictError(
740-
f"Item {item['id']} in collection {item['collection']} already exists"
741-
)
770+
if get_bool_env("RAISE_ON_BULK_ERROR", default=False):
771+
raise ConflictError(error_message)
772+
else:
773+
logger.warning(
774+
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
775+
)
742776

743777
# Serialize the item into a database-compatible format
744778
prepped_item = self.item_serializer.stac_to_db(item, base_url)
745779
logger.debug(f"Item {item['id']} prepared successfully.")
746780
return prepped_item
747781

748-
def sync_prep_create_item(
782+
def bulk_sync_prep_create_item(
749783
self, item: Item, base_url: str, exist_ok: bool = False
750784
) -> Item:
751785
"""
@@ -767,7 +801,8 @@ def sync_prep_create_item(
767801
768802
Raises:
769803
NotFoundError: If the collection that the item belongs to does not exist in the database.
770-
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False.
804+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
805+
and `RAISE_ON_BULK_ERROR` is set to `true`.
771806
"""
772807
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
773808

@@ -780,12 +815,15 @@ def sync_prep_create_item(
780815
index=index_alias_by_collection_id(item["collection"]),
781816
id=mk_item_id(item["id"], item["collection"]),
782817
):
783-
logger.warning(
818+
error_message = (
784819
f"Item {item['id']} in collection {item['collection']} already exists."
785820
)
786-
raise ConflictError(
787-
f"Item {item['id']} in collection {item['collection']} already exists"
788-
)
821+
if get_bool_env("RAISE_ON_BULK_ERROR", default=False):
822+
raise ConflictError(error_message)
823+
else:
824+
logger.warning(
825+
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
826+
)
789827

790828
# Serialize the item into a database-compatible format
791829
prepped_item = self.item_serializer.stac_to_db(item, base_url)
@@ -989,7 +1027,6 @@ async def bulk_async(
9891027
collection_id: str,
9901028
processed_items: List[Item],
9911029
refresh: bool = False,
992-
raise_on_error: bool = False,
9931030
) -> Tuple[int, List[Dict[str, Any]]]:
9941031
"""
9951032
Perform a bulk insert of items into the database asynchronously.
@@ -998,7 +1035,6 @@ async def bulk_async(
9981035
collection_id (str): The ID of the collection to which the items belong.
9991036
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
10001037
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1001-
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
10021038
10031039
Returns:
10041040
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1011,6 +1047,7 @@ async def bulk_async(
10111047
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
10121048
the index is refreshed after the bulk insert.
10131049
"""
1050+
raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
10141051
success, errors = await helpers.async_bulk(
10151052
self.client,
10161053
mk_actions(collection_id, processed_items),
@@ -1024,7 +1061,6 @@ def bulk_sync(
10241061
collection_id: str,
10251062
processed_items: List[Item],
10261063
refresh: bool = False,
1027-
raise_on_error: bool = False,
10281064
) -> Tuple[int, List[Dict[str, Any]]]:
10291065
"""
10301066
Perform a bulk insert of items into the database synchronously.
@@ -1033,7 +1069,6 @@ def bulk_sync(
10331069
collection_id (str): The ID of the collection to which the items belong.
10341070
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
10351071
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1036-
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
10371072
10381073
Returns:
10391074
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1046,6 +1081,7 @@ def bulk_sync(
10461081
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
10471082
True, the index is refreshed after the bulk insert.
10481083
"""
1084+
raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
10491085
success, errors = helpers.bulk(
10501086
self.sync_client,
10511087
mk_actions(collection_id, processed_items),

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
)
3232
from stac_fastapi.core.extensions import filter
3333
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
34-
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon
34+
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env
3535
from stac_fastapi.opensearch.config import (
3636
AsyncOpensearchSettings as AsyncSearchSettings,
3737
)
@@ -723,7 +723,7 @@ async def check_collection_exists(self, collection_id: str):
723723
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
724724
raise NotFoundError(f"Collection {collection_id} does not exist")
725725

726-
async def prep_create_item(
726+
async def async_prep_create_item(
727727
self, item: Item, base_url: str, exist_ok: bool = False
728728
) -> Item:
729729
"""
@@ -753,42 +753,105 @@ async def prep_create_item(
753753

754754
return self.item_serializer.stac_to_db(item, base_url)
755755

756-
def sync_prep_create_item(
756+
async def bulk_async_prep_create_item(
757757
self, item: Item, base_url: str, exist_ok: bool = False
758758
) -> Item:
759759
"""
760760
Prepare an item for insertion into the database.
761761
762-
This method performs pre-insertion preparation on the given `item`,
763-
such as checking if the collection the item belongs to exists,
764-
and optionally verifying that an item with the same ID does not already exist in the database.
762+
This method performs pre-insertion preparation on the given `item`, such as:
763+
- Verifying that the collection the item belongs to exists.
764+
- Optionally checking if an item with the same ID already exists in the database.
765+
- Serializing the item into a database-compatible format.
765766
766767
Args:
767-
item (Item): The item to be inserted into the database.
768-
base_url (str): The base URL used for constructing URLs for the item.
769-
exist_ok (bool): Indicates whether the item can exist already.
768+
item (Item): The item to be prepared for insertion.
769+
base_url (str): The base URL used to construct the item's self URL.
770+
exist_ok (bool): Indicates whether the item can already exist in the database.
771+
If False, a `ConflictError` is raised if the item exists.
770772
771773
Returns:
772-
Item: The item after preparation is done.
774+
Item: The prepared item, serialized into a database-compatible format.
773775
774776
Raises:
775777
NotFoundError: If the collection that the item belongs to does not exist in the database.
776-
ConflictError: If an item with the same ID already exists in the collection.
778+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
779+
and `RAISE_ON_BULK_ERROR` is set to `true`.
777780
"""
778-
item_id = item["id"]
779-
collection_id = item["collection"]
780-
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id):
781-
raise NotFoundError(f"Collection {collection_id} does not exist")
781+
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
782782

783-
if not exist_ok and self.sync_client.exists(
784-
index=index_alias_by_collection_id(collection_id),
785-
id=mk_item_id(item_id, collection_id),
783+
# Check if the collection exists
784+
await self.check_collection_exists(collection_id=item["collection"])
785+
786+
# Check if the item already exists in the database
787+
if not exist_ok and await self.client.exists(
788+
index=index_alias_by_collection_id(item["collection"]),
789+
id=mk_item_id(item["id"], item["collection"]),
786790
):
787-
raise ConflictError(
788-
f"Item {item_id} in collection {collection_id} already exists"
791+
error_message = (
792+
f"Item {item['id']} in collection {item['collection']} already exists."
789793
)
794+
if get_bool_env("RAISE_ON_BULK_ERROR", default=False):
795+
raise ConflictError(error_message)
796+
else:
797+
logger.warning(
798+
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
799+
)
800+
# Serialize the item into a database-compatible format
801+
prepped_item = self.item_serializer.stac_to_db(item, base_url)
802+
logger.debug(f"Item {item['id']} prepared successfully.")
803+
return prepped_item
804+
805+
def bulk_sync_prep_create_item(
806+
self, item: Item, base_url: str, exist_ok: bool = False
807+
) -> Item:
808+
"""
809+
Prepare an item for insertion into the database.
790810
791-
return self.item_serializer.stac_to_db(item, base_url)
811+
This method performs pre-insertion preparation on the given `item`, such as:
812+
- Verifying that the collection the item belongs to exists.
813+
- Optionally checking if an item with the same ID already exists in the database.
814+
- Serializing the item into a database-compatible format.
815+
816+
Args:
817+
item (Item): The item to be prepared for insertion.
818+
base_url (str): The base URL used to construct the item's self URL.
819+
exist_ok (bool): Indicates whether the item can already exist in the database.
820+
If False, a `ConflictError` is raised if the item exists.
821+
822+
Returns:
823+
Item: The prepared item, serialized into a database-compatible format.
824+
825+
Raises:
826+
NotFoundError: If the collection that the item belongs to does not exist in the database.
827+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False,
828+
and `RAISE_ON_BULK_ERROR` is set to `true`.
829+
"""
830+
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
831+
832+
# Check if the collection exists
833+
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
834+
raise NotFoundError(f"Collection {item['collection']} does not exist")
835+
836+
# Check if the item already exists in the database
837+
if not exist_ok and self.sync_client.exists(
838+
index=index_alias_by_collection_id(item["collection"]),
839+
id=mk_item_id(item["id"], item["collection"]),
840+
):
841+
error_message = (
842+
f"Item {item['id']} in collection {item['collection']} already exists."
843+
)
844+
if get_bool_env("RAISE_ON_BULK_ERROR", default=False):
845+
raise ConflictError(error_message)
846+
else:
847+
logger.warning(
848+
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
849+
)
850+
851+
# Serialize the item into a database-compatible format
852+
prepped_item = self.item_serializer.stac_to_db(item, base_url)
853+
logger.debug(f"Item {item['id']} prepared successfully.")
854+
return prepped_item
792855

793856
async def create_item(self, item: Item, refresh: bool = False):
794857
"""Database logic for creating one item.
@@ -987,7 +1050,6 @@ async def bulk_async(
9871050
collection_id: str,
9881051
processed_items: List[Item],
9891052
refresh: bool = False,
990-
raise_on_error: bool = False,
9911053
) -> Tuple[int, List[Dict[str, Any]]]:
9921054
"""
9931055
Perform a bulk insert of items into the database asynchronously.
@@ -996,7 +1058,6 @@ async def bulk_async(
9961058
collection_id (str): The ID of the collection to which the items belong.
9971059
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
9981060
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
999-
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
10001061
10011062
Returns:
10021063
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1009,6 +1070,7 @@ async def bulk_async(
10091070
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
10101071
the index is refreshed after the bulk insert.
10111072
"""
1073+
raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
10121074
success, errors = await helpers.async_bulk(
10131075
self.client,
10141076
mk_actions(collection_id, processed_items),
@@ -1022,7 +1084,6 @@ def bulk_sync(
10221084
collection_id: str,
10231085
processed_items: List[Item],
10241086
refresh: bool = False,
1025-
raise_on_error: bool = False,
10261087
) -> Tuple[int, List[Dict[str, Any]]]:
10271088
"""
10281089
Perform a bulk insert of items into the database synchronously.
@@ -1031,7 +1092,6 @@ def bulk_sync(
10311092
collection_id (str): The ID of the collection to which the items belong.
10321093
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
10331094
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1034-
raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False).
10351095
10361096
Returns:
10371097
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1044,6 +1104,7 @@ def bulk_sync(
10441104
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
10451105
True, the index is refreshed after the bulk insert.
10461106
"""
1107+
raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False)
10471108
success, errors = helpers.bulk(
10481109
self.sync_client,
10491110
mk_actions(collection_id, processed_items),

0 commit comments

Comments
 (0)