Skip to content

Commit 870e291

Browse files
committed
log, add response to bulk insert
1 parent 64d646f commit 870e291

File tree

3 files changed

+132
-73
lines changed

3 files changed

+132
-73
lines changed

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -676,21 +676,22 @@ class TransactionsClient(AsyncBaseTransactionsClient):
676676
@overrides
677677
async def create_item(
678678
self, collection_id: str, item: Union[Item, ItemCollection], **kwargs
679-
) -> Optional[stac_types.Item]:
680-
"""Create an item in the collection.
679+
) -> Union[stac_types.Item, str]:
680+
"""
681+
Create an item or a feature collection of items in the specified collection.
681682
682683
Args:
683-
collection_id (str): The id of the collection to add the item to.
684-
item (stac_types.Item): The item to be added to the collection.
685-
kwargs: Additional keyword arguments.
684+
collection_id (str): The ID of the collection to add the item(s) to.
685+
item (Union[Item, ItemCollection]): A single item or a collection of items to be added.
686+
**kwargs: Additional keyword arguments, such as `request` and `refresh`.
686687
687688
Returns:
688-
stac_types.Item: The created item.
689+
Union[stac_types.Item, str]: The created item if a single item is added, or a summary string
690+
indicating the number of items successfully added and errors if a collection of items is added.
689691
690692
Raises:
691-
NotFound: If the specified collection is not found in the database.
692-
ConflictError: If the item in the specified collection already exists.
693-
693+
NotFoundError: If the specified collection is not found in the database.
694+
ConflictError: If an item with the same ID already exists in the collection.
694695
"""
695696
item = item.model_dump(mode="json")
696697
base_url = str(kwargs["request"].base_url)
@@ -706,12 +707,16 @@ async def create_item(
706707
)
707708
for item in item["features"]
708709
]
709-
710-
await self.database.bulk_async(
710+
attempted = len(processed_items)
711+
success, errors = await self.database.bulk_async(
711712
collection_id, processed_items, refresh=kwargs.get("refresh", False)
712713
)
714+
if errors:
715+
logger.error(f"Bulk async operation encountered errors: {errors}")
716+
else:
717+
logger.info(f"Bulk async operation succeeded with {success} actions.")
713718

714-
return None
719+
return f"Successfully added {success} Items. {attempted - success} errors occurred."
715720
else:
716721
item = await self.database.prep_create_item(item=item, base_url=base_url)
717722
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
@@ -907,12 +912,16 @@ def bulk_item_insert(
907912

908913
# not a great way to get the collection_id-- should be part of the method signature
909914
collection_id = processed_items[0]["collection"]
910-
911-
self.database.bulk_sync(
915+
attempted = len(processed_items)
916+
success, errors = self.database.bulk_sync(
912917
collection_id, processed_items, refresh=kwargs.get("refresh", False)
913918
)
919+
if errors:
920+
logger.error(f"Bulk sync operation encountered errors: {errors}")
921+
else:
922+
logger.info(f"Bulk sync operation succeeded with {success} actions.")
914923

915-
return f"Successfully added {len(processed_items)} Items."
924+
return f"Successfully added {success} Items. {attempted - success} errors occurred."
916925

917926

918927
_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = {

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -703,68 +703,94 @@ async def prep_create_item(
703703
self, item: Item, base_url: str, exist_ok: bool = False
704704
) -> Item:
705705
"""
706-
Preps an item for insertion into the database.
706+
Prepare an item for insertion into the database.
707+
708+
This method performs pre-insertion preparation on the given `item`, such as:
709+
- Verifying that the collection the item belongs to exists.
710+
- Optionally checking if an item with the same ID already exists in the database.
711+
- Serializing the item into a database-compatible format.
707712
708713
Args:
709-
item (Item): The item to be prepped for insertion.
710-
base_url (str): The base URL used to create the item's self URL.
711-
exist_ok (bool): Indicates whether the item can exist already.
714+
item (Item): The item to be prepared for insertion.
715+
base_url (str): The base URL used to construct the item's self URL.
716+
exist_ok (bool): Indicates whether the item can already exist in the database.
717+
If False, a `ConflictError` is raised if the item exists.
712718
713719
Returns:
714-
Item: The prepped item.
720+
Item: The prepared item, serialized into a database-compatible format.
715721
716722
Raises:
717-
ConflictError: If the item already exists in the database.
718-
723+
NotFoundError: If the collection that the item belongs to does not exist in the database.
724+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False.
719725
"""
726+
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
727+
728+
# Check if the collection exists
720729
await self.check_collection_exists(collection_id=item["collection"])
721730

731+
# Check if the item already exists in the database
722732
if not exist_ok and await self.client.exists(
723733
index=index_alias_by_collection_id(item["collection"]),
724734
id=mk_item_id(item["id"], item["collection"]),
725735
):
736+
logger.warning(
737+
f"Item {item['id']} in collection {item['collection']} already exists."
738+
)
726739
raise ConflictError(
727740
f"Item {item['id']} in collection {item['collection']} already exists"
728741
)
729742

730-
return self.item_serializer.stac_to_db(item, base_url)
743+
# Serialize the item into a database-compatible format
744+
prepped_item = self.item_serializer.stac_to_db(item, base_url)
745+
logger.debug(f"Item {item['id']} prepared successfully.")
746+
return prepped_item
731747

732748
def sync_prep_create_item(
733749
self, item: Item, base_url: str, exist_ok: bool = False
734750
) -> Item:
735751
"""
736752
Prepare an item for insertion into the database.
737753
738-
This method performs pre-insertion preparation on the given `item`,
739-
such as checking if the collection the item belongs to exists,
740-
and optionally verifying that an item with the same ID does not already exist in the database.
754+
This method performs pre-insertion preparation on the given `item`, such as:
755+
- Verifying that the collection the item belongs to exists.
756+
- Optionally checking if an item with the same ID already exists in the database.
757+
- Serializing the item into a database-compatible format.
741758
742759
Args:
743-
item (Item): The item to be inserted into the database.
744-
base_url (str): The base URL used for constructing URLs for the item.
745-
exist_ok (bool): Indicates whether the item can exist already.
760+
item (Item): The item to be prepared for insertion.
761+
base_url (str): The base URL used to construct the item's self URL.
762+
exist_ok (bool): Indicates whether the item can already exist in the database.
763+
If False, a `ConflictError` is raised if the item exists.
746764
747765
Returns:
748-
Item: The item after preparation is done.
766+
Item: The prepared item, serialized into a database-compatible format.
749767
750768
Raises:
751769
NotFoundError: If the collection that the item belongs to does not exist in the database.
752-
ConflictError: If an item with the same ID already exists in the collection.
770+
ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False.
753771
"""
754-
item_id = item["id"]
755-
collection_id = item["collection"]
756-
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id):
757-
raise NotFoundError(f"Collection {collection_id} does not exist")
772+
logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.")
758773

774+
# Check if the collection exists
775+
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
776+
raise NotFoundError(f"Collection {item['collection']} does not exist")
777+
778+
# Check if the item already exists in the database
759779
if not exist_ok and self.sync_client.exists(
760-
index=index_alias_by_collection_id(collection_id),
761-
id=mk_item_id(item_id, collection_id),
780+
index=index_alias_by_collection_id(item["collection"]),
781+
id=mk_item_id(item["id"], item["collection"]),
762782
):
783+
logger.warning(
784+
f"Item {item['id']} in collection {item['collection']} already exists."
785+
)
763786
raise ConflictError(
764-
f"Item {item_id} in collection {collection_id} already exists"
787+
f"Item {item['id']} in collection {item['collection']} already exists"
765788
)
766789

767-
return self.item_serializer.stac_to_db(item, base_url)
790+
# Serialize the item into a database-compatible format
791+
prepped_item = self.item_serializer.stac_to_db(item, base_url)
792+
logger.debug(f"Item {item['id']} prepared successfully.")
793+
return prepped_item
768794

769795
async def create_item(self, item: Item, refresh: bool = False):
770796
"""Database logic for creating one item.
@@ -960,51 +986,63 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
960986

961987
async def bulk_async(
962988
self, collection_id: str, processed_items: List[Item], refresh: bool = False
963-
) -> None:
964-
"""Perform a bulk insert of items into the database asynchronously.
989+
) -> Tuple[int, List[Dict[str, Any]]]:
990+
"""
991+
Perform a bulk insert of items into the database asynchronously.
965992
966993
Args:
967-
self: The instance of the object calling this function.
968994
collection_id (str): The ID of the collection to which the items belong.
969995
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
970996
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
971997
998+
Returns:
999+
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
1000+
- The number of successfully processed actions (`success`).
1001+
- A list of errors encountered during the bulk operation (`errors`).
1002+
9721003
Notes:
973-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
974-
insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The
975-
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
976-
index is refreshed after the bulk insert. The function does not return any value.
1004+
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1005+
The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor.
1006+
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
1007+
the index is refreshed after the bulk insert.
9771008
"""
978-
await helpers.async_bulk(
1009+
success, errors = await helpers.async_bulk(
9791010
self.client,
9801011
mk_actions(collection_id, processed_items),
9811012
refresh=refresh,
982-
raise_on_error=False,
1013+
raise_on_error=False, # Do not raise errors
9831014
)
1015+
return success, errors
9841016

9851017
def bulk_sync(
9861018
self, collection_id: str, processed_items: List[Item], refresh: bool = False
987-
) -> None:
988-
"""Perform a bulk insert of items into the database synchronously.
1019+
) -> Tuple[int, List[Dict[str, Any]]]:
1020+
"""
1021+
Perform a bulk insert of items into the database synchronously.
9891022
9901023
Args:
991-
self: The instance of the object calling this function.
9921024
collection_id (str): The ID of the collection to which the items belong.
9931025
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
9941026
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
9951027
1028+
Returns:
1029+
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
1030+
- The number of successfully processed actions (`success`).
1031+
- A list of errors encountered during the bulk operation (`errors`).
1032+
9961033
Notes:
997-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
998-
insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1034+
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1035+
The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
9991036
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
1000-
True, the index is refreshed after the bulk insert. The function does not return any value.
1037+
True, the index is refreshed after the bulk insert.
10011038
"""
1002-
helpers.bulk(
1039+
success, errors = helpers.bulk(
10031040
self.sync_client,
10041041
mk_actions(collection_id, processed_items),
10051042
refresh=refresh,
1006-
raise_on_error=False,
1043+
raise_on_error=False, # Do not raise errors
10071044
)
1045+
return success, errors
10081046

10091047
# DANGER
10101048
async def delete_items(self) -> None:

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -984,51 +984,63 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
984984

985985
async def bulk_async(
986986
self, collection_id: str, processed_items: List[Item], refresh: bool = False
987-
) -> None:
988-
"""Perform a bulk insert of items into the database asynchronously.
987+
) -> Tuple[int, List[Dict[str, Any]]]:
988+
"""
989+
Perform a bulk insert of items into the database asynchronously.
989990
990991
Args:
991-
self: The instance of the object calling this function.
992992
collection_id (str): The ID of the collection to which the items belong.
993993
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
994994
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
995995
996+
Returns:
997+
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
998+
- The number of successfully processed actions (`success`).
999+
- A list of errors encountered during the bulk operation (`errors`).
1000+
9961001
Notes:
997-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
998-
insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The
999-
`mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the
1000-
index is refreshed after the bulk insert. The function does not return any value.
1002+
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1003+
The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor.
1004+
The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
1005+
the index is refreshed after the bulk insert.
10011006
"""
1002-
await helpers.async_bulk(
1007+
success, errors = await helpers.async_bulk(
10031008
self.client,
10041009
mk_actions(collection_id, processed_items),
10051010
refresh=refresh,
1006-
raise_on_error=False,
1011+
raise_on_error=False, # Do not raise errors
10071012
)
1013+
return success, errors
10081014

10091015
def bulk_sync(
10101016
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1011-
) -> None:
1012-
"""Perform a bulk insert of items into the database synchronously.
1017+
) -> Tuple[int, List[Dict[str, Any]]]:
1018+
"""
1019+
Perform a bulk insert of items into the database synchronously.
10131020
10141021
Args:
1015-
self: The instance of the object calling this function.
10161022
collection_id (str): The ID of the collection to which the items belong.
10171023
processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
10181024
refresh (bool): Whether to refresh the index after the bulk insert (default: False).
10191025
1026+
Returns:
1027+
Tuple[int, List[Dict[str, Any]]]: A tuple containing:
1028+
- The number of successfully processed actions (`success`).
1029+
- A list of errors encountered during the bulk operation (`errors`).
1030+
10201031
Notes:
1021-
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The
1022-
insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1032+
This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1033+
The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
10231034
completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
1024-
True, the index is refreshed after the bulk insert. The function does not return any value.
1035+
True, the index is refreshed after the bulk insert.
10251036
"""
1026-
helpers.bulk(
1037+
success, errors = helpers.bulk(
10271038
self.sync_client,
10281039
mk_actions(collection_id, processed_items),
10291040
refresh=refresh,
1030-
raise_on_error=False,
1041+
raise_on_error=False, # Do not raise errors
10311042
)
1043+
return success, errors
10321044

10331045
# DANGER
10341046
async def delete_items(self) -> None:

0 commit comments

Comments
 (0)