3131)
3232from stac_fastapi .core .extensions import filter
3333from stac_fastapi .core .serializers import CollectionSerializer , ItemSerializer
34- from stac_fastapi .core .utilities import MAX_LIMIT , bbox2polygon
34+ from stac_fastapi .core .utilities import MAX_LIMIT , bbox2polygon , resolve_refresh
3535from stac_fastapi .opensearch .config import (
3636 AsyncOpensearchSettings as AsyncSearchSettings ,
3737)
@@ -864,15 +864,17 @@ def bulk_sync_prep_create_item(
864864 async def create_item (
865865 self ,
866866 item : Item ,
867- refresh : bool = False ,
868867 base_url : str = "" ,
869868 exist_ok : bool = False ,
869+ ** kwargs : Any ,
870870 ):
871871 """Database logic for creating one item.
872872
873873 Args:
874874 item (Item): The item to be created.
875- refresh (bool, optional): Refresh the index after performing the operation. Defaults to False.
875+ base_url (str, optional): The base URL for the item. Defaults to an empty string.
876+ exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False.
877+ **kwargs: Additional keyword arguments like refresh.
876878
877879 Raises:
878880 ConflictError: If the item already exists in the database.
@@ -883,6 +885,19 @@ async def create_item(
883885 # todo: check if collection exists, but cache
884886 item_id = item ["id" ]
885887 collection_id = item ["collection" ]
888+
889+ # Ensure kwargs is a dictionary
890+ kwargs = kwargs or {}
891+
892+ # Resolve the `refresh` parameter
893+ refresh = kwargs .get ("refresh" , self .async_settings .database_refresh )
894+ refresh = resolve_refresh (str (refresh ).lower ())
895+
896+ # Log the creation attempt
897+ logger .info (
898+ f"Creating item { item_id } in collection { collection_id } with refresh={ refresh } "
899+ )
900+
886901 item = await self .async_prep_create_item (
887902 item = item , base_url = base_url , exist_ok = exist_ok
888903 )
@@ -893,19 +908,29 @@ async def create_item(
893908 refresh = refresh ,
894909 )
895910
896- async def delete_item (
897- self , item_id : str , collection_id : str , refresh : bool = False
898- ):
911+ async def delete_item (self , item_id : str , collection_id : str , ** kwargs : Any ):
899912 """Delete a single item from the database.
900913
901914 Args:
902915 item_id (str): The id of the Item to be deleted.
903916 collection_id (str): The id of the Collection that the Item belongs to.
904- refresh (bool, optional): Whether to refresh the index after the deletion. Default is False .
917+ **kwargs: Additional keyword arguments like refresh .
905918
906919 Raises:
907920 NotFoundError: If the Item does not exist in the database.
908921 """
922+ # Ensure kwargs is a dictionary
923+ kwargs = kwargs or {}
924+
925+ # Resolve the `refresh` parameter
926+ refresh = kwargs .get ("refresh" , self .async_settings .database_refresh )
927+ refresh = resolve_refresh (str (refresh ).lower ())
928+
929+ # Log the deletion attempt
930+ logger .info (
931+ f"Deleting item { item_id } from collection { collection_id } with refresh={ refresh } "
932+ )
933+
909934 try :
910935 await self .client .delete (
911936 index = index_alias_by_collection_id (collection_id ),
@@ -935,12 +960,12 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]:
935960 except exceptions .NotFoundError :
936961 raise NotFoundError (f"Mapping for index { index_name } not found" )
937962
938- async def create_collection (self , collection : Collection , refresh : bool = False ):
963+ async def create_collection (self , collection : Collection , ** kwargs : Any ):
939964 """Create a single collection in the database.
940965
941966 Args:
942967 collection (Collection): The Collection object to be created.
943- refresh (bool, optional): Whether to refresh the index after the creation. Default is False .
968+ **kwargs: Additional keyword arguments like refresh .
944969
945970 Raises:
946971 ConflictError: If a Collection with the same id already exists in the database.
@@ -950,6 +975,16 @@ async def create_collection(self, collection: Collection, refresh: bool = False)
950975 """
951976 collection_id = collection ["id" ]
952977
978+ # Ensure kwargs is a dictionary
979+ kwargs = kwargs or {}
980+
981+ # Resolve the `refresh` parameter
982+ refresh = kwargs .get ("refresh" , self .async_settings .database_refresh )
983+ refresh = resolve_refresh (str (refresh ).lower ())
984+
985+ # Log the creation attempt
986+ logger .info (f"Creating collection { collection_id } with refresh={ refresh } " )
987+
953988 if await self .client .exists (index = COLLECTIONS_INDEX , id = collection_id ):
954989 raise ConflictError (f"Collection { collection_id } already exists" )
955990
@@ -989,14 +1024,14 @@ async def find_collection(self, collection_id: str) -> Collection:
9891024 return collection ["_source" ]
9901025
9911026 async def update_collection (
992- self , collection_id : str , collection : Collection , refresh : bool = False
1027+ self , collection_id : str , collection : Collection , ** kwargs : Any
9931028 ):
9941029 """Update a collection from the database.
9951030
9961031 Args:
997- self: The instance of the object calling this function.
9981032 collection_id (str): The ID of the collection to be updated.
9991033 collection (Collection): The Collection object to be used for the update.
1034+ **kwargs: Additional keyword arguments like refresh.
10001035
10011036 Raises:
10021037 NotFoundError: If the collection with the given `collection_id` is not
@@ -1007,9 +1042,23 @@ async def update_collection(
10071042 `collection_id` and with the collection specified in the `Collection` object.
10081043 If the collection is not found, a `NotFoundError` is raised.
10091044 """
1045+ # Ensure kwargs is a dictionary
1046+ kwargs = kwargs or {}
1047+
1048+ # Resolve the `refresh` parameter
1049+ refresh = kwargs .get ("refresh" , self .async_settings .database_refresh )
1050+ refresh = resolve_refresh (str (refresh ).lower ())
1051+
1052+ # Log the update attempt
1053+ logger .info (f"Updating collection { collection_id } with refresh={ refresh } " )
1054+
10101055 await self .find_collection (collection_id = collection_id )
10111056
10121057 if collection_id != collection ["id" ]:
1058+ logger .info (
1059+ f"Collection ID change detected: { collection_id } -> { collection ['id' ]} "
1060+ )
1061+
10131062 await self .create_collection (collection , refresh = refresh )
10141063
10151064 await self .client .reindex (
@@ -1025,7 +1074,7 @@ async def update_collection(
10251074 refresh = refresh ,
10261075 )
10271076
1028- await self .delete_collection (collection_id )
1077+ await self .delete_collection (collection_id = collection_id , ** kwargs )
10291078
10301079 else :
10311080 await self .client .index (
@@ -1035,23 +1084,34 @@ async def update_collection(
10351084 refresh = refresh ,
10361085 )
10371086
1038- async def delete_collection (self , collection_id : str , refresh : bool = False ):
1087+ async def delete_collection (self , collection_id : str , ** kwargs : Any ):
10391088 """Delete a collection from the database.
10401089
10411090 Parameters:
10421091 self: The instance of the object calling this function.
10431092 collection_id (str): The ID of the collection to be deleted.
1044- refresh (bool): Whether to refresh the index after the deletion (default: False) .
1093+ **kwargs: Additional keyword arguments like refresh .
10451094
10461095 Raises:
10471096 NotFoundError: If the collection with the given `collection_id` is not found in the database.
10481097
10491098 Notes:
10501099 This function first verifies that the collection with the specified `collection_id` exists in the database, and then
1051- deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this
1052- function also calls `delete_item_index` to delete the index for the items in the collection.
1100+ deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after
1101+ the deletion. Additionally, this function also calls `delete_item_index` to delete the index for the items in the collection.
10531102 """
1103+ # Ensure kwargs is a dictionary
1104+ kwargs = kwargs or {}
1105+
10541106 await self .find_collection (collection_id = collection_id )
1107+
1108+ # Resolve the `refresh` parameter
1109+ refresh = kwargs .get ("refresh" , self .async_settings .database_refresh )
1110+ refresh = resolve_refresh (str (refresh ).lower ())
1111+
1112+ # Log the deletion attempt
1113+ logger .info (f"Deleting collection { collection_id } with refresh={ refresh } " )
1114+
10551115 await self .client .delete (
10561116 index = COLLECTIONS_INDEX , id = collection_id , refresh = refresh
10571117 )
@@ -1061,15 +1121,17 @@ async def bulk_async(
10611121 self ,
10621122 collection_id : str ,
10631123 processed_items : List [Item ],
1064- refresh : bool = False ,
1124+ ** kwargs : Any ,
10651125 ) -> Tuple [int , List [Dict [str , Any ]]]:
10661126 """
10671127 Perform a bulk insert of items into the database asynchronously.
10681128
10691129 Args:
10701130 collection_id (str): The ID of the collection to which the items belong.
10711131 processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
1072- refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1132+ **kwargs (Any): Additional keyword arguments, including:
1133+ - refresh (str, optional): Whether to refresh the index after the bulk insert.
1134+ Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`.
10731135
10741136 Returns:
10751137 Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1078,32 +1140,58 @@ async def bulk_async(
10781140
10791141 Notes:
10801142 This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
1081- The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor.
1082- The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True,
1083- the index is refreshed after the bulk insert.
1143+ The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1144+ completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh`
1145+ parameter determines whether the index is refreshed after the bulk insert:
1146+ - "true": Forces an immediate refresh of the index.
1147+ - "false": Does not refresh the index immediately (default behavior).
1148+ - "wait_for": Waits for the next refresh cycle to make the changes visible.
10841149 """
1150+ # Ensure kwargs is a dictionary
1151+ kwargs = kwargs or {}
1152+
1153+ # Resolve the `refresh` parameter
1154+ refresh = kwargs .get ("refresh" , self .async_settings .database_refresh )
1155+ refresh = resolve_refresh (str (refresh ).lower ())
1156+
1157+ # Log the bulk insert attempt
1158+ logger .info (
1159+ f"Performing bulk insert for collection { collection_id } with refresh={ refresh } "
1160+ )
1161+
1162+ # Handle empty processed_items
1163+ if not processed_items :
1164+ logger .warning (f"No items to insert for collection { collection_id } " )
1165+ return 0 , []
1166+
10851167 raise_on_error = self .async_settings .raise_on_bulk_error
10861168 success , errors = await helpers .async_bulk (
10871169 self .client ,
10881170 mk_actions (collection_id , processed_items ),
10891171 refresh = refresh ,
10901172 raise_on_error = raise_on_error ,
10911173 )
1174+ # Log the result
1175+ logger .info (
1176+ f"Bulk insert completed for collection { collection_id } : { success } successes, { len (errors )} errors"
1177+ )
10921178 return success , errors
10931179
10941180 def bulk_sync (
10951181 self ,
10961182 collection_id : str ,
10971183 processed_items : List [Item ],
1098- refresh : bool = False ,
1184+ ** kwargs : Any ,
10991185 ) -> Tuple [int , List [Dict [str , Any ]]]:
11001186 """
11011187 Perform a bulk insert of items into the database synchronously.
11021188
11031189 Args:
11041190 collection_id (str): The ID of the collection to which the items belong.
11051191 processed_items (List[Item]): A list of `Item` objects to be inserted into the database.
1106- refresh (bool): Whether to refresh the index after the bulk insert (default: False).
1192+ **kwargs (Any): Additional keyword arguments, including:
1193+ - refresh (str, optional): Whether to refresh the index after the bulk insert.
1194+ Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`.
11071195
11081196 Returns:
11091197 Tuple[int, List[Dict[str, Any]]]: A tuple containing:
@@ -1113,9 +1201,29 @@ def bulk_sync(
11131201 Notes:
11141202 This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`.
11151203 The insert is performed synchronously and blocking, meaning that the function does not return until the insert has
1116- completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to
1117- True, the index is refreshed after the bulk insert.
1204+ completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh`
1205+ parameter determines whether the index is refreshed after the bulk insert:
1206+ - "true": Forces an immediate refresh of the index.
1207+ - "false": Does not refresh the index immediately (default behavior).
1208+ - "wait_for": Waits for the next refresh cycle to make the changes visible.
11181209 """
1210+ # Ensure kwargs is a dictionary
1211+ kwargs = kwargs or {}
1212+
1213+ # Resolve the `refresh` parameter
1214+ refresh = kwargs .get ("refresh" , self .sync_settings .database_refresh )
1215+ refresh = resolve_refresh (str (refresh ).lower ())
1216+
1217+ # Log the bulk insert attempt
1218+ logger .info (
1219+ f"Performing bulk insert for collection { collection_id } with refresh={ refresh } "
1220+ )
1221+
1222+ # Handle empty processed_items
1223+ if not processed_items :
1224+ logger .warning (f"No items to insert for collection { collection_id } " )
1225+ return 0 , []
1226+
11191227 raise_on_error = self .sync_settings .raise_on_bulk_error
11201228 success , errors = helpers .bulk (
11211229 self .sync_client ,
0 commit comments