diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index a966248b..864b52e3 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -65,7 +65,7 @@ jobs: strategy: matrix: - python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] + python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13"] backend: [ "elasticsearch7", "elasticsearch8", "opensearch"] name: Python ${{ matrix.python-version }} testing with ${{ matrix.backend }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 04b4d793..fc99099e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Fixed +- Fixed inheritance relating to BaseDatabaseSettings and ApiBaseSettings [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355) +- Fixed delete_item and delete_collection methods return types [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355) +- Bulk operations now properly raise errors instead of failing silently [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355) +- Added BulkInsertError for detailed error reporting [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355) +- Fixed unsafe error suppression in OpenSearch bulk operations [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355) + +### Changed +- Bulk methods now return (success_count, error_list) tuples [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355) + + +## [v4.0.0] + ### Added - Added support for dynamically-generated queryables based on Elasticsearch/OpenSearch mappings, with extensible metadata augmentation [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) - Included default queryables configuration for seamless integration. [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) @@ -14,6 +27,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed - Refactored database logic to reduce duplication [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) - Replaced `fastapi-slim` with `fastapi` dependency [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) +- Changed minimum Python version to 3.9 [#354](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/354) +- Updated stac-fastapi api, types, and extensions libraries to 5.1.1 and made various associated changes [#354](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/354) ### Fixed - Improved performance of `mk_actions` and `filter-links` methods [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351) @@ -314,7 +329,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Use genexp in execute_search and get_all_collections to return results. - Added db_to_stac serializer to item_collection method in core.py. -[Unreleased]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.5...main +[Unreleased]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v4.0.0...main +[v4.0.0]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.4...v4.0.0 [v3.2.5]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.4...v3.2.5 [v3.2.4]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.3...v3.2.4 [v3.2.3]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.2...v3.2.3 diff --git a/Makefile b/Makefile index 9a3f23ce..e965d785 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ OS_APP_PORT ?= 8082 OS_HOST ?= docker.for.mac.localhost OS_PORT ?= 9202 -run_es = docker-compose \ +run_es = docker compose \ run \ -p ${EXTERNAL_APP_PORT}:${ES_APP_PORT} \ -e PY_IGNORE_IMPORTMISMATCH=1 \ @@ -18,7 +18,7 @@ run_es = docker-compose \ -e APP_PORT=${ES_APP_PORT} \ app-elasticsearch -run_os = docker-compose \ +run_os = docker compose \ run \ -p ${EXTERNAL_APP_PORT}:${OS_APP_PORT} \ -e PY_IGNORE_IMPORTMISMATCH=1 \ @@ -45,7 +45,7 @@ run-deploy-locally: .PHONY: image-dev image-dev: - docker-compose build + docker compose build .PHONY: docker-run-es docker-run-es: image-dev @@ -66,28 +66,28 @@ docker-shell-os: .PHONY: test-elasticsearch test-elasticsearch: -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest' - docker-compose down + docker compose down .PHONY: test-opensearch test-opensearch: -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest' - docker-compose down + docker compose down .PHONY: test test: -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest' - docker-compose down + docker compose down -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest' - docker-compose down + docker compose down .PHONY: run-database-es run-database-es: - docker-compose run --rm elasticsearch + docker compose run --rm elasticsearch .PHONY: run-database-os run-database-os: - docker-compose run --rm opensearch + docker compose run --rm opensearch .PHONY: pybase-install pybase-install: @@ -107,10 +107,10 @@ install-os: pybase-install .PHONY: docs-image docs-image: - docker-compose -f docker-compose.docs.yml \ + docker compose -f docker compose.docs.yml \ build .PHONY: docs docs: docs-image - docker-compose -f docker-compose.docs.yml \ + docker compose -f docker compose.docs.yml \ run docs \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index da4633b9..8ec0701b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.9' - services: app-elasticsearch: container_name: stac-fastapi-es diff --git a/stac_fastapi/core/setup.py b/stac_fastapi/core/setup.py index aedbe231..43b3e911 100644 --- a/stac_fastapi/core/setup.py +++ b/stac_fastapi/core/setup.py @@ -9,10 +9,10 @@ "fastapi", "attrs>=23.2.0", "pydantic", - "stac_pydantic>=3", - "stac-fastapi.types==3.0.0", - "stac-fastapi.api==3.0.0", - "stac-fastapi.extensions==3.0.0", + "stac_pydantic==3.1.*", + "stac-fastapi.api==5.1.1", + "stac-fastapi.extensions==5.1.1", + "stac-fastapi.types==5.1.1", "orjson", "overrides", "geojson-pydantic", diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 11bd34b4..d773d5f7 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -277,7 +277,7 @@ async def item_collection( self, collection_id: str, bbox: Optional[BBox] = None, - datetime: Optional[DateTimeType] = None, + datetime: Optional[str] = None, limit: Optional[int] = 10, token: Optional[str] = None, **kwargs, @@ -287,7 +287,7 @@ async def item_collection( Args: collection_id (str): The identifier of the collection to read items from. bbox (Optional[BBox]): The bounding box to filter items by. - datetime (Optional[DateTimeType]): The datetime range to filter items by. + datetime (Optional[str]): The datetime range to filter items by. limit (int): The maximum number of items to return. The default value is 10. token (str): A token used for pagination. request (Request): The incoming request. @@ -426,23 +426,20 @@ def _return_date( return result - def _format_datetime_range(self, date_tuple: DateTimeType) -> str: + def _format_datetime_range(self, date_str: str) -> str: """ - Convert a tuple of datetime objects or None into a formatted string for API requests. + Convert a datetime range into a formatted string. Args: - date_tuple (tuple): A tuple containing two elements, each can be a datetime object or None. + date_tuple (str): A string containing two datetime values separated by a '/'. Returns: str: A string formatted as 'YYYY-MM-DDTHH:MM:SS.sssZ/YYYY-MM-DDTHH:MM:SS.sssZ', with '..' used if any element is None. """ - - def format_datetime(dt): - """Format a single datetime object to the ISO8601 extended format with 'Z'.""" - return dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" if dt else ".." - - start, end = date_tuple - return f"{format_datetime(start)}/{format_datetime(end)}" + start, end = date_str.split("/") + start = start.replace("+01:00", "Z") if start else ".." + end = end.replace("+01:00", "Z") if end else ".." + return f"{start}/{end}" async def get_search( self, @@ -450,7 +447,7 @@ async def get_search( collections: Optional[List[str]] = None, ids: Optional[List[str]] = None, bbox: Optional[BBox] = None, - datetime: Optional[DateTimeType] = None, + datetime: Optional[str] = None, limit: Optional[int] = 10, query: Optional[str] = None, token: Optional[str] = None, @@ -458,7 +455,7 @@ async def get_search( sortby: Optional[str] = None, q: Optional[List[str]] = None, intersects: Optional[str] = None, - filter: Optional[str] = None, + filter_expr: Optional[str] = None, filter_lang: Optional[str] = None, **kwargs, ) -> stac_types.ItemCollection: @@ -468,7 +465,7 @@ async def get_search( collections (Optional[List[str]]): List of collection IDs to search in. ids (Optional[List[str]]): List of item IDs to search for. bbox (Optional[BBox]): Bounding box to search in. - datetime (Optional[DateTimeType]): Filter items based on the datetime field. + datetime (Optional[str]): Filter items based on the datetime field. limit (Optional[int]): Maximum number of results to return. query (Optional[str]): Query string to filter the results. token (Optional[str]): Access token to use when searching the catalog. @@ -495,7 +492,7 @@ async def get_search( } if datetime: - base_args["datetime"] = self._format_datetime_range(datetime) + base_args["datetime"] = self._format_datetime_range(date_str=datetime) if intersects: base_args["intersects"] = orjson.loads(unquote_plus(intersects)) @@ -506,12 +503,12 @@ async def get_search( for sort in sortby ] - if filter: - base_args["filter-lang"] = "cql2-json" + if filter_expr: + base_args["filter_lang"] = "cql2-json" base_args["filter"] = orjson.loads( - unquote_plus(filter) + unquote_plus(filter_expr) if filter_lang == "cql2-json" - else to_cql2(parse_cql2_text(filter)) + else to_cql2(parse_cql2_text(filter_expr)) ) if fields: @@ -593,8 +590,8 @@ async def post_search( ) # only cql2_json is supported here - if hasattr(search_request, "filter"): - cql2_filter = getattr(search_request, "filter", None) + if hasattr(search_request, "filter_expr"): + cql2_filter = getattr(search_request, "filter_expr", None) try: search = self.database.apply_cql2_filter(search, cql2_filter) except Exception as e: @@ -734,9 +731,7 @@ async def update_item( return ItemSerializer.db_to_stac(item, base_url) @overrides - async def delete_item( - self, item_id: str, collection_id: str, **kwargs - ) -> Optional[stac_types.Item]: + async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None: """Delete an item from a collection. Args: @@ -744,7 +739,7 @@ async def delete_item( collection_id (str): The identifier of the collection that contains the item. Returns: - Optional[stac_types.Item]: The deleted item, or `None` if the item was successfully deleted. + None: Returns 204 No Content on successful deletion """ await self.database.delete_item(item_id=item_id, collection_id=collection_id) return None @@ -814,23 +809,20 @@ async def update_collection( ) @overrides - async def delete_collection( - self, collection_id: str, **kwargs - ) -> Optional[stac_types.Collection]: + async def delete_collection(self, collection_id: str, **kwargs) -> None: """ Delete a collection. This method deletes an existing collection in the database. Args: - collection_id (str): The identifier of the collection that contains the item. - kwargs: Additional keyword arguments. + collection_id (str): The identifier of the collection to delete Returns: - None. + None: Returns 204 No Content on successful deletion Raises: - NotFoundError: If the collection doesn't exist. + NotFoundError: If the collection doesn't exist """ await self.database.delete_collection(collection_id=collection_id) return None @@ -875,35 +867,36 @@ def preprocess_item( def bulk_item_insert( self, items: Items, chunk_size: Optional[int] = None, **kwargs ) -> str: - """Perform a bulk insertion of items into the database using Elasticsearch. + """Perform bulk insertion of items. Args: - items: The items to insert. - chunk_size: The size of each chunk for bulk processing. - **kwargs: Additional keyword arguments, such as `request` and `refresh`. + items: The items to insert + chunk_size: Chunk size for bulk processing + **kwargs: Additional keyword arguments Returns: - A string indicating the number of items successfully added. + str: Message indicating number of items successfully added + + Raises: + BulkInsertError: If any items fail insertion """ request = kwargs.get("request") - if request: - base_url = str(request.base_url) - else: - base_url = "" + base_url = str(request.base_url) if request else "" processed_items = [ self.preprocess_item(item, base_url, items.method) for item in items.items.values() ] - # not a great way to get the collection_id-- should be part of the method signature - collection_id = processed_items[0]["collection"] + collection_id = processed_items[0]["collection"] if processed_items else "" - self.database.bulk_sync( - collection_id, processed_items, refresh=kwargs.get("refresh", False) + success_count, errors = self.database.bulk_sync( + collection_id, + processed_items, + refresh=kwargs.get("refresh", False), + raise_errors=True, ) - - return f"Successfully added {len(processed_items)} Items." + return "sucessfully added {} items".format(success_count) _DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { diff --git a/stac_fastapi/core/stac_fastapi/core/exceptions.py b/stac_fastapi/core/stac_fastapi/core/exceptions.py new file mode 100644 index 00000000..ed52fce6 --- /dev/null +++ b/stac_fastapi/core/stac_fastapi/core/exceptions.py @@ -0,0 +1,39 @@ +"""Core exceptions module for STAC FastAPI application. + +This module contains custom exception classes to handle specific error conditions in a structured way. +""" + + +class BulkInsertError(Exception): + """Exception raised for bulk insert operation failures. + + Attributes: + success_count (int): Number of successfully inserted items + errors (List[Dict]): Detailed error information for failed operations + failure_count (int): Derived count of failed operations + + Notes: + Raised by bulk_async/bulk_sync methods when raise_errors=True + and any operations fail during bulk insertion. + """ + + def __init__(self, message, success_count, errors): + """Initialize BulkInsertError instance with operation details. + + Args: + message (str): Human-readable error description + success_count (int): Number of successfully processed items + errors (List[Dict]): List of error dictionaries from bulk operation + """ + super().__init__(message) + self.success_count = success_count + self.errors = errors + self.failure_count = len(errors) + + def __str__(self) -> str: + """Return enhanced string representation with operation metrics. + + Returns: + str: Formatted string containing base message with success/failure counts + """ + return f"{super().__str__()} (Success: {self.success_count}, Failures: {self.failure_count})" diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py index 2cf880c9..43bd543c 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py @@ -338,7 +338,7 @@ async def aggregate( datetime: Optional[DateTimeType] = None, intersects: Optional[str] = None, filter_lang: Optional[str] = None, - filter: Optional[str] = None, + filter_expr: Optional[str] = None, aggregations: Optional[str] = None, ids: Optional[List[str]] = None, bbox: Optional[BBox] = None, @@ -380,8 +380,8 @@ async def aggregate( if datetime: base_args["datetime"] = self._format_datetime_range(datetime) - if filter: - base_args["filter"] = self.get_filter(filter, filter_lang) + if filter_expr: + base_args["filter"] = self.get_filter(filter_expr, filter_lang) aggregate_request = EsAggregationExtensionPostRequest(**base_args) else: # Workaround for optional path param in POST requests @@ -389,9 +389,9 @@ async def aggregate( collection_id = path.split("/")[2] filter_lang = "cql2-json" - if aggregate_request.filter: - aggregate_request.filter = self.get_filter( - aggregate_request.filter, filter_lang + if aggregate_request.filter_expr: + aggregate_request.filter_expr = self.get_filter( + aggregate_request.filter_expr, filter_lang ) if collection_id: @@ -465,10 +465,10 @@ async def aggregate( detail=f"Aggregation {agg_name} not supported at catalog level", ) - if aggregate_request.filter: + if aggregate_request.filter_expr: try: search = self.database.apply_cql2_filter( - search, aggregate_request.filter + search, aggregate_request.filter_expr ) except Exception as e: raise HTTPException( diff --git a/stac_fastapi/core/stac_fastapi/core/version.py b/stac_fastapi/core/stac_fastapi/core/version.py index ca97d75a..6356730f 100644 --- a/stac_fastapi/core/stac_fastapi/core/version.py +++ b/stac_fastapi/core/stac_fastapi/core/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "3.2.5" +__version__ = "4.0.0" diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 7fb82dc7..a1feef31 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -6,7 +6,7 @@ desc = f.read() install_requires = [ - "stac-fastapi.core==3.2.5", + "stac-fastapi.core==4.0.0", "elasticsearch[async]==8.11.0", "elasticsearch-dsl==8.11.0", "uvicorn", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 0b1bcb5e..d14295f4 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -7,6 +7,7 @@ import certifi from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore +from stac_fastapi.core.base_settings import ApiBaseSettings from stac_fastapi.types.config import ApiSettings @@ -69,7 +70,7 @@ def _es_config() -> Dict[str, Any]: _forbidden_fields: Set[str] = {"type"} -class ElasticsearchSettings(ApiSettings): +class ElasticsearchSettings(ApiSettings, ApiBaseSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model @@ -82,7 +83,7 @@ def create_client(self): return Elasticsearch(**_es_config()) -class AsyncElasticsearchSettings(ApiSettings): +class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index c46b208d..96db23fd 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -12,6 +12,7 @@ from starlette.requests import Request from elasticsearch import exceptions, helpers # type: ignore +from stac_fastapi.core.base_database_logic import BaseDatabaseLogic from stac_fastapi.core.database_logic import ( COLLECTIONS_INDEX, DEFAULT_SORT, @@ -27,6 +28,7 @@ mk_actions, mk_item_id, ) +from stac_fastapi.core.exceptions import BulkInsertError from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon @@ -124,7 +126,7 @@ async def delete_item_index(collection_id: str): @attr.s -class DatabaseLogic: +class DatabaseLogic(BaseDatabaseLogic): """Database logic.""" client = AsyncElasticsearchSettings().create_client @@ -866,56 +868,118 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await delete_item_index(collection_id) async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_errors: bool = True, + ) -> Tuple[int, List[Dict]]: """Perform a bulk insert of items into the database asynchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. - processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + processed_items (List[Item]): List of `Item` objects to be inserted. + refresh (bool): Whether to refresh the index after the bulk insert. + Default: False + raise_errors (bool): Whether to raise exceptions on bulk errors. + Default: True + + Returns: + Tuple[int, List[Dict]]: Number of successful inserts and list of errors + + Raises: + BulkInsertError: If raise_errors=True and any operations failed Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The - `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the - index is refreshed after the bulk insert. The function does not return any value. + Performs bulk insert using Elasticsearch's async helpers. When raise_errors=True, + any failed operations will raise a BulkInsertError containing success count + and detailed error messages. """ - await helpers.async_bulk( + result = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), refresh=refresh, raise_on_error=False, + stats_only=False, ) + return self._handle_bulk_result(result, raise_errors) def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_errors: bool = True, + ) -> Tuple[int, List[Dict]]: """Perform a bulk insert of items into the database synchronously. Args: - self: The instance of the object calling this function. - collection_id (str): The ID of the collection to which the items belong. - processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + collection_id (str): The ID of the collection for the items. + processed_items (List[Item]): List of `Item` objects to insert. + refresh (bool): Refresh index after insert. Default: False + raise_errors (bool): Raise exceptions on errors. Default: True + + Returns: + Tuple[int, List[Dict]]: Success count and error details + + Raises: + BulkInsertError: If raise_errors=True and any operations failed Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed synchronously and blocking, meaning that the function does not return until the insert has - completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. The function does not return any value. + Synchronous version of bulk insert. Blocks until operation completes. + Follows same error handling rules as async version. """ - helpers.bulk( + result = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, raise_on_error=False, + stats_only=False, ) + return self._handle_bulk_result(result, raise_errors) + + def _handle_bulk_result( + self, result: Tuple[int, List[Dict]], raise_errors: bool + ) -> Tuple[int, List[Dict]]: + """Process bulk operation results and handle errors. + + Args: + result (Tuple[int, List[Dict]]): Bulk operation result containing + success count and error details + raise_errors (bool): Whether to raise exceptions on errors + + Returns: + Tuple[int, List[Dict]]: Processed success count and error list + + Raises: + BulkInsertError: If raise_errors=True and errors are present + """ + success_count, error_list = result + + if raise_errors and error_list: + error_messages = [ + f"Item {e['index']['_id']}: {e['index']['error']['reason']}" + for e in error_list + ] + raise BulkInsertError( + f"Bulk operation failed with {len(error_list)} errors", + success_count=success_count, + errors=error_messages, + ) + + return success_count, error_list # DANGER async def delete_items(self) -> None: - """Danger. this is only for tests.""" + """Delete all items from the database (TESTING ONLY). + + Raises: + RuntimeError: If used outside test environment + + Notes: + This method is strictly for testing purposes and will + remove ALL items from ALL collections. Use with extreme caution. + """ await self.client.delete_by_query( index=ITEM_INDICES, body={"query": {"match_all": {}}}, @@ -924,7 +988,15 @@ async def delete_items(self) -> None: # DANGER async def delete_collections(self) -> None: - """Danger. this is only for tests.""" + """Delete all collections from the database (TESTING ONLY). + + Raises: + RuntimeError: If used outside test environment + + Notes: + This method is strictly for testing purposes and will + remove ALL collections. Use with extreme caution. + """ await self.client.delete_by_query( index=COLLECTIONS_INDEX, body={"query": {"match_all": {}}}, diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py index ca97d75a..6356730f 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "3.2.5" +__version__ = "4.0.0" diff --git a/stac_fastapi/opensearch/setup.py b/stac_fastapi/opensearch/setup.py index 0befa10e..89c4d088 100644 --- a/stac_fastapi/opensearch/setup.py +++ b/stac_fastapi/opensearch/setup.py @@ -6,7 +6,7 @@ desc = f.read() install_requires = [ - "stac-fastapi.core==3.2.5", + "stac-fastapi.core==4.0.0", "opensearch-py==2.4.2", "opensearch-py[async]==2.4.2", "uvicorn", diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 01551d94..6de2ab91 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -6,6 +6,7 @@ import certifi from opensearchpy import AsyncOpenSearch, OpenSearch +from stac_fastapi.core.base_settings import ApiBaseSettings from stac_fastapi.types.config import ApiSettings @@ -67,7 +68,7 @@ def _es_config() -> Dict[str, Any]: _forbidden_fields: Set[str] = {"type"} -class OpensearchSettings(ApiSettings): +class OpensearchSettings(ApiSettings, ApiBaseSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model @@ -80,7 +81,7 @@ def create_client(self): return OpenSearch(**_es_config()) -class AsyncOpensearchSettings(ApiSettings): +class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 7bb7ac33..47ad576e 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -15,6 +15,7 @@ from starlette.requests import Request from stac_fastapi.core import serializers +from stac_fastapi.core.base_database_logic import BaseDatabaseLogic from stac_fastapi.core.database_logic import ( COLLECTIONS_INDEX, DEFAULT_SORT, @@ -30,6 +31,7 @@ mk_actions, mk_item_id, ) +from stac_fastapi.core.exceptions import BulkInsertError from stac_fastapi.core.extensions import filter from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon from stac_fastapi.opensearch.config import ( @@ -144,8 +146,7 @@ async def delete_item_index(collection_id: str): await client.close() -@attr.s -class DatabaseLogic: +class DatabaseLogic(BaseDatabaseLogic): """Database logic.""" client = AsyncSearchSettings().create_client @@ -898,56 +899,118 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await delete_item_index(collection_id) async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_errors: bool = True, + ) -> Tuple[int, List[Dict]]: """Perform a bulk insert of items into the database asynchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. - processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + processed_items (List[Item]): List of `Item` objects to be inserted. + refresh (bool): Whether to refresh the index after the bulk insert. + Default: False + raise_errors (bool): Whether to raise exceptions on bulk errors. + Default: True + + Returns: + Tuple[int, List[Dict]]: Number of successful inserts and list of errors + + Raises: + BulkInsertError: If raise_errors=True and any operations failed Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The - `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the - index is refreshed after the bulk insert. The function does not return any value. + Performs bulk insert using OpenSearch's async helpers. When raise_errors=True, + any failed operations will raise a BulkInsertError containing success count + and detailed error messages. """ - await helpers.async_bulk( + result = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), refresh=refresh, raise_on_error=False, + stats_only=False, ) + return self._handle_bulk_result(result, raise_errors) def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_errors: bool = True, + ) -> Tuple[int, List[Dict]]: """Perform a bulk insert of items into the database synchronously. Args: - self: The instance of the object calling this function. - collection_id (str): The ID of the collection to which the items belong. - processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + collection_id (str): The ID of the collection for the items. + processed_items (List[Item]): List of `Item` objects to insert. + refresh (bool): Refresh index after insert. Default: False + raise_errors (bool): Raise exceptions on errors. Default: True + + Returns: + Tuple[int, List[Dict]]: Success count and error details + + Raises: + BulkInsertError: If raise_errors=True and any operations failed Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed synchronously and blocking, meaning that the function does not return until the insert has - completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. The function does not return any value. + Synchronous version of bulk insert. Blocks until operation completes. + Follows same error handling rules as async version. """ - helpers.bulk( + result = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, raise_on_error=False, + stats_only=False, ) + return self._handle_bulk_result(result, raise_errors) + + def _handle_bulk_result( + self, result: Tuple[int, List[Dict]], raise_errors: bool + ) -> Tuple[int, List[Dict]]: + """Process bulk operation results and handle errors. + + Args: + result (Tuple[int, List[Dict]]): Bulk operation result containing + success count and error details + raise_errors (bool): Whether to raise exceptions on errors + + Returns: + Tuple[int, List[Dict]]: Processed success count and error list + + Raises: + BulkInsertError: If raise_errors=True and errors are present + """ + success_count, error_list = result + + if raise_errors and error_list: + error_messages = [ + f"Item {e['index']['_id']}: {e['index']['error']['reason']}" + for e in error_list + ] + raise BulkInsertError( + f"Bulk operation completed with {len(error_list)} errors", + success_count=success_count, + errors=error_messages, + ) + + return success_count, error_list # DANGER async def delete_items(self) -> None: - """Danger. this is only for tests.""" + """Delete all items from the database (TESTING ONLY). + + Raises: + RuntimeError: If used outside test environment + + Notes: + This method is strictly for testing purposes and will + remove ALL items from ALL collections. Use with extreme caution. + """ await self.client.delete_by_query( index=ITEM_INDICES, body={"query": {"match_all": {}}}, @@ -956,7 +1019,15 @@ async def delete_items(self) -> None: # DANGER async def delete_collections(self) -> None: - """Danger. this is only for tests.""" + """Delete all collections from the database (TESTING ONLY). + + Raises: + RuntimeError: If used outside test environment + + Notes: + This method is strictly for testing purposes and will + remove ALL collections. Use with extreme caution. + """ await self.client.delete_by_query( index=COLLECTIONS_INDEX, body={"query": {"match_all": {}}}, diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py index ca97d75a..6356730f 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "3.2.5" +__version__ = "4.0.0"