Skip to content

Commit 3a75b68

Browse files
committed
Opensearch update body not script.
1 parent a2d6f48 commit 3a75b68

File tree

2 files changed

+176
-149
lines changed

2 files changed

+176
-149
lines changed

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 26 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,7 @@ async def create_item_index(collection_id: str):
273273
}
274274

275275
try:
276-
await client.indices.create(
277-
index=f"{index_by_collection_id(collection_id)}-000001", body=search_body
278-
)
276+
await client.indices.create(index=f"{index_by_collection_id(collection_id)}-000001", body=search_body)
279277
except TransportError as e:
280278
if e.status_code == 400:
281279
pass # Ignore 400 status codes
@@ -356,12 +354,8 @@ class DatabaseLogic:
356354
client = AsyncSearchSettings().create_client
357355
sync_client = SyncSearchSettings().create_client
358356

359-
item_serializer: Type[serializers.ItemSerializer] = attr.ib(
360-
default=serializers.ItemSerializer
361-
)
362-
collection_serializer: Type[serializers.CollectionSerializer] = attr.ib(
363-
default=serializers.CollectionSerializer
364-
)
357+
item_serializer: Type[serializers.ItemSerializer] = attr.ib(default=serializers.ItemSerializer)
358+
collection_serializer: Type[serializers.CollectionSerializer] = attr.ib(default=serializers.CollectionSerializer)
365359

366360
extensions: List[str] = attr.ib(default=attr.Factory(list))
367361

@@ -395,15 +389,9 @@ class DatabaseLogic:
395389
"size": 10000,
396390
}
397391
},
398-
"sun_elevation_frequency": {
399-
"histogram": {"field": "properties.view:sun_elevation", "interval": 5}
400-
},
401-
"sun_azimuth_frequency": {
402-
"histogram": {"field": "properties.view:sun_azimuth", "interval": 5}
403-
},
404-
"off_nadir_frequency": {
405-
"histogram": {"field": "properties.view:off_nadir", "interval": 5}
406-
},
392+
"sun_elevation_frequency": {"histogram": {"field": "properties.view:sun_elevation", "interval": 5}},
393+
"sun_azimuth_frequency": {"histogram": {"field": "properties.view:sun_azimuth", "interval": 5}},
394+
"off_nadir_frequency": {"histogram": {"field": "properties.view:off_nadir", "interval": 5}},
407395
"centroid_geohash_grid_frequency": {
408396
"geohash_grid": {
409397
"field": "properties.proj:centroid",
@@ -506,9 +494,7 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
506494
id=mk_item_id(item_id, collection_id),
507495
)
508496
except exceptions.NotFoundError:
509-
raise NotFoundError(
510-
f"Item {item_id} does not exist in Collection {collection_id}"
511-
)
497+
raise NotFoundError(f"Item {item_id} does not exist in Collection {collection_id}")
512498
return item["_source"]
513499

514500
@staticmethod
@@ -531,9 +517,7 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str]
531517
"""Database logic to perform query for search endpoint."""
532518
if free_text_queries is not None:
533519
free_text_query_string = '" OR properties.\\*:"'.join(free_text_queries)
534-
search = search.query(
535-
"query_string", query=f'properties.\\*:"{free_text_query_string}"'
536-
)
520+
search = search.query("query_string", query=f'properties.\\*:"{free_text_query_string}"')
537521

538522
return search
539523

@@ -549,16 +533,10 @@ def apply_datetime_filter(search: Search, datetime_search):
549533
Search: The filtered search object.
550534
"""
551535
if "eq" in datetime_search:
552-
search = search.filter(
553-
"term", **{"properties__datetime": datetime_search["eq"]}
554-
)
536+
search = search.filter("term", **{"properties__datetime": datetime_search["eq"]})
555537
else:
556-
search = search.filter(
557-
"range", properties__datetime={"lte": datetime_search["lte"]}
558-
)
559-
search = search.filter(
560-
"range", properties__datetime={"gte": datetime_search["gte"]}
561-
)
538+
search = search.filter("range", properties__datetime={"lte": datetime_search["lte"]})
539+
search = search.filter("range", properties__datetime={"gte": datetime_search["gte"]})
562540
return search
563541

564542
@staticmethod
@@ -761,11 +739,7 @@ async def execute_search(
761739
if hits and (sort_array := hits[limit - 1].get("sort")):
762740
next_token = urlsafe_b64encode(json.dumps(sort_array).encode()).decode()
763741

764-
matched = (
765-
es_response["hits"]["total"]["value"]
766-
if es_response["hits"]["total"]["relation"] == "eq"
767-
else None
768-
)
742+
matched = es_response["hits"]["total"]["value"] if es_response["hits"]["total"]["relation"] == "eq" else None
769743
if count_task.done():
770744
try:
771745
matched = count_task.result().get("count")
@@ -843,9 +817,7 @@ async def check_collection_exists(self, collection_id: str):
843817
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
844818
raise NotFoundError(f"Collection {collection_id} does not exist")
845819

846-
async def prep_create_item(
847-
self, item: Item, base_url: str, exist_ok: bool = False
848-
) -> Item:
820+
async def prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item:
849821
"""
850822
Preps an item for insertion into the database.
851823
@@ -867,15 +839,11 @@ async def prep_create_item(
867839
index=index_alias_by_collection_id(item["collection"]),
868840
id=mk_item_id(item["id"], item["collection"]),
869841
):
870-
raise ConflictError(
871-
f"Item {item['id']} in collection {item['collection']} already exists"
872-
)
842+
raise ConflictError(f"Item {item['id']} in collection {item['collection']} already exists")
873843

874844
return self.item_serializer.stac_to_db(item, base_url)
875845

876-
def sync_prep_create_item(
877-
self, item: Item, base_url: str, exist_ok: bool = False
878-
) -> Item:
846+
def sync_prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item:
879847
"""
880848
Prepare an item for insertion into the database.
881849
@@ -904,9 +872,7 @@ def sync_prep_create_item(
904872
index=index_alias_by_collection_id(collection_id),
905873
id=mk_item_id(item_id, collection_id),
906874
):
907-
raise ConflictError(
908-
f"Item {item_id} in collection {collection_id} already exists"
909-
)
875+
raise ConflictError(f"Item {item_id} in collection {collection_id} already exists")
910876

911877
return self.item_serializer.stac_to_db(item, base_url)
912878

@@ -934,9 +900,7 @@ async def create_item(self, item: Item, refresh: bool = False):
934900
)
935901

936902
if (meta := es_resp.get("meta")) and meta.get("status") == 409:
937-
raise ConflictError(
938-
f"Item {item_id} in collection {collection_id} already exists"
939-
)
903+
raise ConflictError(f"Item {item_id} in collection {collection_id} already exists")
940904

941905
async def merge_patch_item(
942906
self,
@@ -1059,9 +1023,7 @@ async def json_patch_item(
10591023

10601024
return item
10611025

1062-
async def delete_item(
1063-
self, item_id: str, collection_id: str, refresh: bool = False
1064-
):
1026+
async def delete_item(self, item_id: str, collection_id: str, refresh: bool = False):
10651027
"""Delete a single item from the database.
10661028
10671029
Args:
@@ -1079,9 +1041,7 @@ async def delete_item(
10791041
refresh=refresh,
10801042
)
10811043
except exceptions.NotFoundError:
1082-
raise NotFoundError(
1083-
f"Item {item_id} in collection {collection_id} not found"
1084-
)
1044+
raise NotFoundError(f"Item {item_id} in collection {collection_id} not found")
10851045

10861046
async def create_collection(self, collection: Collection, refresh: bool = False):
10871047
"""Create a single collection in the database.
@@ -1128,17 +1088,13 @@ async def find_collection(self, collection_id: str) -> Collection:
11281088
collection as a `Collection` object. If the collection is not found, a `NotFoundError` is raised.
11291089
"""
11301090
try:
1131-
collection = await self.client.get(
1132-
index=COLLECTIONS_INDEX, id=collection_id
1133-
)
1091+
collection = await self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
11341092
except exceptions.NotFoundError:
11351093
raise NotFoundError(f"Collection {collection_id} not found")
11361094

11371095
return collection["_source"]
11381096

1139-
async def update_collection(
1140-
self, collection_id: str, collection: Collection, refresh: bool = False
1141-
):
1097+
async def update_collection(self, collection_id: str, collection: Collection, refresh: bool = False):
11421098
"""Update a collection from the database.
11431099
11441100
Args:
@@ -1249,7 +1205,7 @@ async def json_patch_collection(
12491205
await self.client.update(
12501206
index=COLLECTIONS_INDEX,
12511207
id=collection_id,
1252-
script=script,
1208+
body={"script": script},
12531209
refresh=refresh,
12541210
)
12551211

@@ -1258,9 +1214,7 @@ async def json_patch_collection(
12581214
if new_collection_id:
12591215
collection["id"] = new_collection_id
12601216
collection["links"] = resolve_links([], base_url)
1261-
await self.update_collection(
1262-
collection_id=collection_id, collection=collection, refresh=False
1263-
)
1217+
await self.update_collection(collection_id=collection_id, collection=collection, refresh=False)
12641218

12651219
return collection
12661220

@@ -1281,14 +1235,10 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
12811235
function also calls `delete_item_index` to delete the index for the items in the collection.
12821236
"""
12831237
await self.find_collection(collection_id=collection_id)
1284-
await self.client.delete(
1285-
index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh
1286-
)
1238+
await self.client.delete(index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh)
12871239
await delete_item_index(collection_id)
12881240

1289-
async def bulk_async(
1290-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1291-
) -> None:
1241+
async def bulk_async(self, collection_id: str, processed_items: List[Item], refresh: bool = False) -> None:
12921242
"""Perform a bulk insert of items into the database asynchronously.
12931243
12941244
Args:
@@ -1310,9 +1260,7 @@ async def bulk_async(
13101260
raise_on_error=False,
13111261
)
13121262

1313-
def bulk_sync(
1314-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1315-
) -> None:
1263+
def bulk_sync(self, collection_id: str, processed_items: List[Item], refresh: bool = False) -> None:
13161264
"""Perform a bulk insert of items into the database synchronously.
13171265
13181266
Args:

0 commit comments

Comments
 (0)