From 7351f20dfdc95bafc9cfc5705f749fa68f902540 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Fri, 21 Mar 2025 13:14:11 +0000 Subject: [PATCH 1/6] Use list of objects for assets. --- stac_fastapi/core/stac_fastapi/core/serializers.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/stac_fastapi/core/stac_fastapi/core/serializers.py b/stac_fastapi/core/stac_fastapi/core/serializers.py index 9b0d36d4..22546703 100644 --- a/stac_fastapi/core/stac_fastapi/core/serializers.py +++ b/stac_fastapi/core/stac_fastapi/core/serializers.py @@ -1,4 +1,5 @@ """Serializers.""" + import abc from copy import deepcopy from typing import Any, List, Optional @@ -65,6 +66,10 @@ def stac_to_db(cls, stac_data: stac_types.Item, base_url: str) -> stac_types.Ite item_links = resolve_links(stac_data.get("links", []), base_url) stac_data["links"] = item_links + stac_data["assets"] = [ + {"es_key": k, **v} for k, v in stac_data.get("assets", {}).items() + ] + now = now_to_rfc3339_str() if "created" not in stac_data["properties"]: stac_data["properties"]["created"] = now @@ -102,7 +107,7 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item: bbox=item.get("bbox", []), properties=item.get("properties", {}), links=item_links, - assets=item.get("assets", {}), + assets={a.pop("es_key"): a for a in item.get("assets", [])}, ) @@ -127,6 +132,9 @@ def stac_to_db( collection["links"] = resolve_links( collection.get("links", []), str(request.base_url) ) + collection["assets"] = [ + {"es_key": k, **v} for k, v in collection.get("assets", {}).items() + ] return collection @classmethod @@ -173,5 +181,9 @@ def db_to_stac( collection_links += resolve_links(original_links, str(request.base_url)) collection["links"] = collection_links + collection["assets"] = { + a.pop("es_key"): a for a in collection.get("assets", []) + } + # Return the stac_types.Collection object return stac_types.Collection(**collection) From a2648a390f285f1e924a39a71007bdb8a3d18b02 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Fri, 21 Mar 2025 14:45:39 +0000 Subject: [PATCH 2/6] Fix update item test failure. --- stac_fastapi/core/stac_fastapi/core/core.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 56afcbc8..dd9c9897 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -377,7 +377,7 @@ async def get_item( @staticmethod def _return_date( - interval: Optional[Union[DateTimeType, str]] + interval: Optional[Union[DateTimeType, str]], ) -> Dict[str, Optional[str]]: """ Convert a date interval. @@ -724,15 +724,14 @@ async def update_item( """ item = item.model_dump(mode="json") - base_url = str(kwargs["request"].base_url) now = datetime_type.now(timezone.utc).isoformat().replace("+00:00", "Z") item["properties"]["updated"] = now await self.database.check_collection_exists(collection_id) await self.delete_item(item_id=item_id, collection_id=collection_id) - await self.create_item(collection_id=collection_id, item=Item(**item), **kwargs) - - return ItemSerializer.db_to_stac(item, base_url) + return await self.create_item( + collection_id=collection_id, item=Item(**item), **kwargs + ) @overrides async def delete_item( From 2d69d9105f0ee16cc571a0c4551cc4cf4c3ac201 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Wed, 3 Sep 2025 10:33:42 +0100 Subject: [PATCH 3/6] Enable asset indexing. --- .../elasticsearch/database_logic.py | 92 +++++-------------- 1 file changed, 23 insertions(+), 69 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 0f272218..ed372dc0 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -113,7 +113,7 @@ "id": {"type": "keyword"}, "collection": {"type": "keyword"}, "geometry": {"type": "geo_shape"}, - "assets": {"type": "object", "enabled": False}, + "assets": {"type": "object"}, "links": {"type": "object", "enabled": False}, "properties": { "type": "object", @@ -324,9 +324,7 @@ class DatabaseLogic: sync_client = SyncElasticsearchSettings().create_client item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) - collection_serializer: Type[CollectionSerializer] = attr.ib( - default=CollectionSerializer - ) + collection_serializer: Type[CollectionSerializer] = attr.ib(default=CollectionSerializer) extensions: List[str] = attr.ib(default=attr.Factory(list)) @@ -360,15 +358,9 @@ class DatabaseLogic: "size": 10000, } }, - "sun_elevation_frequency": { - "histogram": {"field": "properties.view:sun_elevation", "interval": 5} - }, - "sun_azimuth_frequency": { - "histogram": {"field": "properties.view:sun_azimuth", "interval": 5} - }, - "off_nadir_frequency": { - "histogram": {"field": "properties.view:off_nadir", "interval": 5} - }, + "sun_elevation_frequency": {"histogram": {"field": "properties.view:sun_elevation", "interval": 5}}, + "sun_azimuth_frequency": {"histogram": {"field": "properties.view:sun_azimuth", "interval": 5}}, + "off_nadir_frequency": {"histogram": {"field": "properties.view:off_nadir", "interval": 5}}, "centroid_geohash_grid_frequency": { "geohash_grid": { "field": "properties.proj:centroid", @@ -465,9 +457,7 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: id=mk_item_id(item_id, collection_id), ) except exceptions.NotFoundError: - raise NotFoundError( - f"Item {item_id} does not exist in Collection {collection_id}" - ) + raise NotFoundError(f"Item {item_id} does not exist in Collection {collection_id}") return item["_source"] @staticmethod @@ -497,16 +487,10 @@ def apply_datetime_filter(search: Search, datetime_search): Search: The filtered search object. """ if "eq" in datetime_search: - search = search.filter( - "term", **{"properties__datetime": datetime_search["eq"]} - ) + search = search.filter("term", **{"properties__datetime": datetime_search["eq"]}) else: - search = search.filter( - "range", properties__datetime={"lte": datetime_search["lte"]} - ) - search = search.filter( - "range", properties__datetime={"gte": datetime_search["gte"]} - ) + search = search.filter("range", properties__datetime={"lte": datetime_search["lte"]}) + search = search.filter("range", properties__datetime={"gte": datetime_search["gte"]}) return search @staticmethod @@ -600,9 +584,7 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str] """Database logic to perform query for search endpoint.""" if free_text_queries is not None: free_text_query_string = '" OR properties.\\*:"'.join(free_text_queries) - search = search.query( - "query_string", query=f'properties.\\*:"{free_text_query_string}"' - ) + search = search.query("query_string", query=f'properties.\\*:"{free_text_query_string}"') return search @@ -715,11 +697,7 @@ async def execute_search( if hits and (sort_array := hits[limit - 1].get("sort")): next_token = urlsafe_b64encode(json.dumps(sort_array).encode()).decode() - matched = ( - es_response["hits"]["total"]["value"] - if es_response["hits"]["total"]["relation"] == "eq" - else None - ) + matched = es_response["hits"]["total"]["value"] if es_response["hits"]["total"]["relation"] == "eq" else None if count_task.done(): try: matched = count_task.result().get("count") @@ -799,9 +777,7 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item( - self, item: Item, base_url: str, exist_ok: bool = False - ) -> Item: + async def prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item: """ Preps an item for insertion into the database. @@ -823,15 +799,11 @@ async def prep_create_item( index=index_alias_by_collection_id(item["collection"]), id=mk_item_id(item["id"], item["collection"]), ): - raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" - ) + raise ConflictError(f"Item {item['id']} in collection {item['collection']} already exists") return self.item_serializer.stac_to_db(item, base_url) - def sync_prep_create_item( - self, item: Item, base_url: str, exist_ok: bool = False - ) -> Item: + def sync_prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item: """ Prepare an item for insertion into the database. @@ -860,9 +832,7 @@ def sync_prep_create_item( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), ): - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" - ) + raise ConflictError(f"Item {item_id} in collection {collection_id} already exists") return self.item_serializer.stac_to_db(item, base_url) @@ -890,13 +860,9 @@ async def create_item(self, item: Item, refresh: bool = False): ) if (meta := es_resp.get("meta")) and meta.get("status") == 409: - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" - ) + raise ConflictError(f"Item {item_id} in collection {collection_id} already exists") - async def delete_item( - self, item_id: str, collection_id: str, refresh: bool = False - ): + async def delete_item(self, item_id: str, collection_id: str, refresh: bool = False): """Delete a single item from the database. Args: @@ -914,9 +880,7 @@ async def delete_item( refresh=refresh, ) except exceptions.NotFoundError: - raise NotFoundError( - f"Item {item_id} in collection {collection_id} not found" - ) + raise NotFoundError(f"Item {item_id} in collection {collection_id} not found") async def create_collection(self, collection: Collection, refresh: bool = False): """Create a single collection in the database. @@ -963,17 +927,13 @@ async def find_collection(self, collection_id: str) -> Collection: collection as a `Collection` object. If the collection is not found, a `NotFoundError` is raised. """ try: - collection = await self.client.get( - index=COLLECTIONS_INDEX, id=collection_id - ) + collection = await self.client.get(index=COLLECTIONS_INDEX, id=collection_id) except exceptions.NotFoundError: raise NotFoundError(f"Collection {collection_id} not found") return collection["_source"] - async def update_collection( - self, collection_id: str, collection: Collection, refresh: bool = False - ): + async def update_collection(self, collection_id: str, collection: Collection, refresh: bool = False): """Update a collection from the database. Args: @@ -1035,14 +995,10 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): function also calls `delete_item_index` to delete the index for the items in the collection. """ await self.find_collection(collection_id=collection_id) - await self.client.delete( - index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh - ) + await self.client.delete(index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh) await delete_item_index(collection_id) - async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: + async def bulk_async(self, collection_id: str, processed_items: List[Item], refresh: bool = False) -> None: """Perform a bulk insert of items into the database asynchronously. Args: @@ -1064,9 +1020,7 @@ async def bulk_async( raise_on_error=False, ) - def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: + def bulk_sync(self, collection_id: str, processed_items: List[Item], refresh: bool = False) -> None: """Perform a bulk insert of items into the database synchronously. Args: From c7d9d985e8e38346314939396157e1f787385cb2 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Wed, 3 Sep 2025 10:38:15 +0100 Subject: [PATCH 4/6] pre-commit. --- stac_fastapi/core/stac_fastapi/core/core.py | 160 +++++++++++++++----- 1 file changed, 121 insertions(+), 39 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 2959232c..8e7da91b 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -71,12 +71,16 @@ class CoreClient(AsyncBaseCoreClient): """ database: BaseDatabaseLogic = attr.ib() - base_conformance_classes: List[str] = attr.ib(factory=lambda: BASE_CONFORMANCE_CLASSES) + base_conformance_classes: List[str] = attr.ib( + factory=lambda: BASE_CONFORMANCE_CLASSES + ) extensions: List[ApiExtension] = attr.ib(default=attr.Factory(list)) session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) - collection_serializer: Type[CollectionSerializer] = attr.ib(default=CollectionSerializer) + collection_serializer: Type[CollectionSerializer] = attr.ib( + default=CollectionSerializer + ) post_request_model = attr.ib(default=BaseSearchPostRequest) stac_version: str = attr.ib(default=STAC_VERSION) landing_page_id: str = attr.ib(default="stac-fastapi") @@ -200,7 +204,9 @@ async def landing_page(self, **kwargs) -> stac_types.LandingPage: "rel": "service-desc", "type": "application/vnd.oai.openapi+json;version=3.0", "title": "OpenAPI service description", - "href": urljoin(str(request.base_url), request.app.openapi_url.lstrip("/")), + "href": urljoin( + str(request.base_url), request.app.openapi_url.lstrip("/") + ), } ) @@ -210,7 +216,9 @@ async def landing_page(self, **kwargs) -> stac_types.LandingPage: "rel": "service-doc", "type": "text/html", "title": "OpenAPI service documentation", - "href": urljoin(str(request.base_url), request.app.docs_url.lstrip("/")), + "href": urljoin( + str(request.base_url), request.app.docs_url.lstrip("/") + ), } ) @@ -230,7 +238,9 @@ async def all_collections(self, **kwargs) -> stac_types.Collections: limit = int(request.query_params.get("limit", os.getenv("STAC_ITEM_LIMIT", 10))) token = request.query_params.get("token") - collections, next_token = await self.database.get_all_collections(token=token, limit=limit, request=request) + collections, next_token = await self.database.get_all_collections( + token=token, limit=limit, request=request + ) links = [ {"rel": Relations.root.value, "type": MimeTypes.json, "href": base_url}, @@ -248,7 +258,9 @@ async def all_collections(self, **kwargs) -> stac_types.Collections: return stac_types.Collections(collections=collections, links=links) - async def get_collection(self, collection_id: str, **kwargs) -> stac_types.Collection: + async def get_collection( + self, collection_id: str, **kwargs + ) -> stac_types.Collection: """Get a collection from the database by its id. Args: @@ -301,16 +313,22 @@ async def item_collection( base_url = str(request.base_url) - collection = await self.get_collection(collection_id=collection_id, request=request) + collection = await self.get_collection( + collection_id=collection_id, request=request + ) collection_id = collection.get("id") if collection_id is None: raise HTTPException(status_code=404, detail="Collection not found") search = self.database.make_search() - search = self.database.apply_collections_filter(search=search, collection_ids=[collection_id]) + search = self.database.apply_collections_filter( + search=search, collection_ids=[collection_id] + ) try: - search, datetime_search = self.database.apply_datetime_filter(search=search, datetime=datetime) + search, datetime_search = self.database.apply_datetime_filter( + search=search, datetime=datetime + ) except (ValueError, TypeError) as e: # Handle invalid interval formats if return_date fails msg = f"Invalid interval format: {datetime}, error: {e}" @@ -334,7 +352,9 @@ async def item_collection( datetime_search=datetime_search, ) - items = [self.item_serializer.db_to_stac(item, base_url=base_url) for item in items] + items = [ + self.item_serializer.db_to_stac(item, base_url=base_url) for item in items + ] links = await PagingLinks(request=request, next=next_token).get_links() @@ -346,7 +366,9 @@ async def item_collection( numMatched=maybe_count, ) - async def get_item(self, item_id: str, collection_id: str, **kwargs) -> stac_types.Item: + async def get_item( + self, item_id: str, collection_id: str, **kwargs + ) -> stac_types.Item: """Get an item from the database based on its id and collection id. Args: @@ -361,7 +383,9 @@ async def get_item(self, item_id: str, collection_id: str, **kwargs) -> stac_typ NotFoundError: If the item does not exist in the specified collection. """ base_url = str(kwargs["request"].base_url) - item = await self.database.get_one_item(item_id=item_id, collection_id=collection_id) + item = await self.database.get_one_item( + item_id=item_id, collection_id=collection_id + ) return self.item_serializer.db_to_stac(item, base_url) async def get_search( @@ -423,13 +447,16 @@ async def get_search( if sortby: base_args["sortby"] = [ - {"field": sort[1:], "direction": "desc" if sort[0] == "-" else "asc"} for sort in sortby + {"field": sort[1:], "direction": "desc" if sort[0] == "-" else "asc"} + for sort in sortby ] if filter_expr: base_args["filter_lang"] = "cql2-json" base_args["filter"] = orjson.loads( - unquote_plus(filter_expr) if filter_lang == "cql2-json" else to_cql2(parse_cql2_text(filter_expr)) + unquote_plus(filter_expr) + if filter_lang == "cql2-json" + else to_cql2(parse_cql2_text(filter_expr)) ) if fields: @@ -445,12 +472,16 @@ async def get_search( try: search_request = self.post_request_model(**base_args) except ValidationError as e: - raise HTTPException(status_code=400, detail=f"Invalid parameters provided: {e}") + raise HTTPException( + status_code=400, detail=f"Invalid parameters provided: {e}" + ) resp = await self.post_search(search_request=search_request, request=request) return resp - async def post_search(self, search_request: BaseSearchPostRequest, request: Request) -> stac_types.ItemCollection: + async def post_search( + self, search_request: BaseSearchPostRequest, request: Request + ) -> stac_types.ItemCollection: """ Perform a POST search on the catalog. @@ -469,10 +500,14 @@ async def post_search(self, search_request: BaseSearchPostRequest, request: Requ search = self.database.make_search() if search_request.ids: - search = self.database.apply_ids_filter(search=search, item_ids=search_request.ids) + search = self.database.apply_ids_filter( + search=search, item_ids=search_request.ids + ) if search_request.collections: - search = self.database.apply_collections_filter(search=search, collection_ids=search_request.collections) + search = self.database.apply_collections_filter( + search=search, collection_ids=search_request.collections + ) try: search, datetime_search = self.database.apply_datetime_filter( @@ -492,7 +527,9 @@ async def post_search(self, search_request: BaseSearchPostRequest, request: Requ search = self.database.apply_bbox_filter(search=search, bbox=bbox) if search_request.intersects: - search = self.database.apply_intersects_filter(search=search, intersects=search_request.intersects) + search = self.database.apply_intersects_filter( + search=search, intersects=search_request.intersects + ) if search_request.query: for field_name, expr in search_request.query.items(): @@ -500,7 +537,9 @@ async def post_search(self, search_request: BaseSearchPostRequest, request: Requ for op, value in expr.items(): # Convert enum to string operator = op.value if isinstance(op, Enum) else op - search = self.database.apply_stacql_filter(search=search, op=operator, field=field, value=value) + search = self.database.apply_stacql_filter( + search=search, op=operator, field=field, value=value + ) # only cql2_json is supported here if hasattr(search_request, "filter_expr"): @@ -508,14 +547,18 @@ async def post_search(self, search_request: BaseSearchPostRequest, request: Requ try: search = await self.database.apply_cql2_filter(search, cql2_filter) except Exception as e: - raise HTTPException(status_code=400, detail=f"Error with cql2_json filter: {e}") + raise HTTPException( + status_code=400, detail=f"Error with cql2_json filter: {e}" + ) if hasattr(search_request, "q"): free_text_queries = getattr(search_request, "q", None) try: search = self.database.apply_free_text_filter(search, free_text_queries) except Exception as e: - raise HTTPException(status_code=400, detail=f"Error with free text query: {e}") + raise HTTPException( + status_code=400, detail=f"Error with free text query: {e}" + ) sort = None if search_request.sortby: @@ -534,7 +577,11 @@ async def post_search(self, search_request: BaseSearchPostRequest, request: Requ datetime_search=datetime_search, ) - fields = getattr(search_request, "fields", None) if self.extension_is_enabled("FieldsExtension") else None + fields = ( + getattr(search_request, "fields", None) + if self.extension_is_enabled("FieldsExtension") + else None + ) include: Set[str] = fields.include if fields and fields.include else set() exclude: Set[str] = fields.exclude if fields and fields.exclude else set() @@ -593,10 +640,15 @@ async def create_item( # Handle FeatureCollection (bulk insert) if item_dict["type"] == "FeatureCollection": - bulk_client = BulkTransactionsClient(database=self.database, settings=self.settings) + bulk_client = BulkTransactionsClient( + database=self.database, settings=self.settings + ) features = item_dict["features"] processed_items = [ - bulk_client.preprocess_item(feature, base_url, BulkTransactionMethod.INSERT) for feature in features + bulk_client.preprocess_item( + feature, base_url, BulkTransactionMethod.INSERT + ) + for feature in features ] attempted = len(processed_items) @@ -610,15 +662,21 @@ async def create_item( f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})" ) else: - logger.info(f"Bulk async operation succeeded with {success} actions for collection {collection_id}.") + logger.info( + f"Bulk async operation succeeded with {success} actions for collection {collection_id}." + ) return f"Successfully added {success} Items. {attempted - success} errors occurred." # Handle single item - await self.database.create_item(item_dict, base_url=base_url, exist_ok=False, **kwargs) + await self.database.create_item( + item_dict, base_url=base_url, exist_ok=False, **kwargs + ) return ItemSerializer.db_to_stac(item_dict, base_url) @overrides - async def update_item(self, collection_id: str, item_id: str, item: Item, **kwargs) -> stac_types.Item: + async def update_item( + self, collection_id: str, item_id: str, item: Item, **kwargs + ) -> stac_types.Item: """Update an item in the collection. Args: @@ -640,7 +698,9 @@ async def update_item(self, collection_id: str, item_id: str, item: Item, **kwar now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") item["properties"]["updated"] = now - await self.database.create_item(item, base_url=base_url, exist_ok=True, **kwargs) + await self.database.create_item( + item, base_url=base_url, exist_ok=True, **kwargs + ) return ItemSerializer.db_to_stac(item, base_url) @@ -697,7 +757,9 @@ async def patch_item( if item: return ItemSerializer.db_to_stac(item, base_url=base_url) - raise NotImplementedError(f"Content-Type: {content_type} and body: {patch} combination not implemented") + raise NotImplementedError( + f"Content-Type: {content_type} and body: {patch} combination not implemented" + ) @overrides async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None: @@ -710,11 +772,15 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None: Returns: None: Returns 204 No Content on successful deletion """ - await self.database.delete_item(item_id=item_id, collection_id=collection_id, **kwargs) + await self.database.delete_item( + item_id=item_id, collection_id=collection_id, **kwargs + ) return None @overrides - async def create_collection(self, collection: Collection, **kwargs) -> stac_types.Collection: + async def create_collection( + self, collection: Collection, **kwargs + ) -> stac_types.Collection: """Create a new collection in the database. Args: @@ -739,7 +805,9 @@ async def create_collection(self, collection: Collection, **kwargs) -> stac_type ) @overrides - async def update_collection(self, collection_id: str, collection: Collection, **kwargs) -> stac_types.Collection: + async def update_collection( + self, collection_id: str, collection: Collection, **kwargs + ) -> stac_types.Collection: """ Update a collection. @@ -764,7 +832,9 @@ async def update_collection(self, collection_id: str, collection: Collection, ** request = kwargs["request"] collection = self.database.collection_serializer.stac_to_db(collection, request) - await self.database.update_collection(collection_id=collection_id, collection=collection, **kwargs) + await self.database.update_collection( + collection_id=collection_id, collection=collection, **kwargs + ) return CollectionSerializer.db_to_stac( collection, @@ -821,7 +891,9 @@ async def patch_collection( extensions=[type(ext).__name__ for ext in self.database.extensions], ) - raise NotImplementedError(f"Content-Type: {content_type} and body: {patch} combination not implemented") + raise NotImplementedError( + f"Content-Type: {content_type} and body: {patch} combination not implemented" + ) @overrides async def delete_collection(self, collection_id: str, **kwargs) -> None: @@ -860,7 +932,9 @@ def __attrs_post_init__(self): """Create es engine.""" self.client = self.settings.create_client - def preprocess_item(self, item: stac_types.Item, base_url, method: BulkTransactionMethod) -> stac_types.Item: + def preprocess_item( + self, item: stac_types.Item, base_url, method: BulkTransactionMethod + ) -> stac_types.Item: """Preprocess an item to match the data model. Args: @@ -872,10 +946,14 @@ def preprocess_item(self, item: stac_types.Item, base_url, method: BulkTransacti The preprocessed item. """ exist_ok = method == BulkTransactionMethod.UPSERT - return self.database.bulk_sync_prep_create_item(item=item, base_url=base_url, exist_ok=exist_ok) + return self.database.bulk_sync_prep_create_item( + item=item, base_url=base_url, exist_ok=exist_ok + ) @overrides - def bulk_item_insert(self, items: Items, chunk_size: Optional[int] = None, **kwargs) -> str: + def bulk_item_insert( + self, items: Items, chunk_size: Optional[int] = None, **kwargs + ) -> str: """Perform a bulk insertion of items into the database using Elasticsearch. Args: @@ -896,7 +974,11 @@ def bulk_item_insert(self, items: Items, chunk_size: Optional[int] = None, **kwa for item in items.items.values(): try: validated = Item(**item) if not isinstance(item, Item) else item - processed_items.append(self.preprocess_item(validated.model_dump(mode="json"), base_url, items.method)) + processed_items.append( + self.preprocess_item( + validated.model_dump(mode="json"), base_url, items.method + ) + ) except ValidationError: # Immediately raise on the first invalid item (strict mode) raise From c74f919b15e4e6c12789e83d8b3e65c64e872a12 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Wed, 3 Sep 2025 10:42:35 +0100 Subject: [PATCH 5/6] Enable asset indexing. --- .../sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py index 476d656a..935aa84b 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py @@ -134,7 +134,7 @@ class Geometry(Protocol): # noqa "id": {"type": "keyword"}, "collection": {"type": "keyword"}, "geometry": {"type": "geo_shape"}, - "assets": {"type": "object", "enabled": False}, + "assets": {"type": "object"}, "links": {"type": "object", "enabled": False}, "properties": { "type": "object", From 57a9981f2b6bf7d0129536a89accb7605d3e2651 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Wed, 3 Sep 2025 10:47:19 +0100 Subject: [PATCH 6/6] CHANGELOG update. --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 130f5df6..21d9fd13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed +- Changed assets serialization to prevent mapping explosion while allowing asset inforamtion to be indexed. [#341](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/341) + ### Fixed ## [v6.2.1] - 2025-09-02