Skip to content

Commit b6b0721

Browse files
committed
Add changes to opensearch.
1 parent d4bd07a commit b6b0721

File tree

2 files changed

+50
-96
lines changed

2 files changed

+50
-96
lines changed

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 48 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,7 @@ async def create_item_index(collection_id: str):
273273
}
274274

275275
try:
276-
await client.indices.create(
277-
index=f"{index_by_collection_id(collection_id)}-000001", body=search_body
278-
)
276+
await client.indices.create(index=f"{index_by_collection_id(collection_id)}-000001", body=search_body)
279277
except TransportError as e:
280278
if e.status_code == 400:
281279
pass # Ignore 400 status codes
@@ -356,12 +354,8 @@ class DatabaseLogic:
356354
client = AsyncSearchSettings().create_client
357355
sync_client = SyncSearchSettings().create_client
358356

359-
item_serializer: Type[serializers.ItemSerializer] = attr.ib(
360-
default=serializers.ItemSerializer
361-
)
362-
collection_serializer: Type[serializers.CollectionSerializer] = attr.ib(
363-
default=serializers.CollectionSerializer
364-
)
357+
item_serializer: Type[serializers.ItemSerializer] = attr.ib(default=serializers.ItemSerializer)
358+
collection_serializer: Type[serializers.CollectionSerializer] = attr.ib(default=serializers.CollectionSerializer)
365359

366360
extensions: List[str] = attr.ib(default=attr.Factory(list))
367361

@@ -395,15 +389,9 @@ class DatabaseLogic:
395389
"size": 10000,
396390
}
397391
},
398-
"sun_elevation_frequency": {
399-
"histogram": {"field": "properties.view:sun_elevation", "interval": 5}
400-
},
401-
"sun_azimuth_frequency": {
402-
"histogram": {"field": "properties.view:sun_azimuth", "interval": 5}
403-
},
404-
"off_nadir_frequency": {
405-
"histogram": {"field": "properties.view:off_nadir", "interval": 5}
406-
},
392+
"sun_elevation_frequency": {"histogram": {"field": "properties.view:sun_elevation", "interval": 5}},
393+
"sun_azimuth_frequency": {"histogram": {"field": "properties.view:sun_azimuth", "interval": 5}},
394+
"off_nadir_frequency": {"histogram": {"field": "properties.view:off_nadir", "interval": 5}},
407395
"centroid_geohash_grid_frequency": {
408396
"geohash_grid": {
409397
"field": "properties.proj:centroid",
@@ -506,9 +494,7 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
506494
id=mk_item_id(item_id, collection_id),
507495
)
508496
except exceptions.NotFoundError:
509-
raise NotFoundError(
510-
f"Item {item_id} does not exist in Collection {collection_id}"
511-
)
497+
raise NotFoundError(f"Item {item_id} does not exist in Collection {collection_id}")
512498
return item["_source"]
513499

514500
@staticmethod
@@ -531,9 +517,7 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str]
531517
"""Database logic to perform query for search endpoint."""
532518
if free_text_queries is not None:
533519
free_text_query_string = '" OR properties.\\*:"'.join(free_text_queries)
534-
search = search.query(
535-
"query_string", query=f'properties.\\*:"{free_text_query_string}"'
536-
)
520+
search = search.query("query_string", query=f'properties.\\*:"{free_text_query_string}"')
537521

538522
return search
539523

@@ -549,16 +533,10 @@ def apply_datetime_filter(search: Search, datetime_search):
549533
Search: The filtered search object.
550534
"""
551535
if "eq" in datetime_search:
552-
search = search.filter(
553-
"term", **{"properties__datetime": datetime_search["eq"]}
554-
)
536+
search = search.filter("term", **{"properties__datetime": datetime_search["eq"]})
555537
else:
556-
search = search.filter(
557-
"range", properties__datetime={"lte": datetime_search["lte"]}
558-
)
559-
search = search.filter(
560-
"range", properties__datetime={"gte": datetime_search["gte"]}
561-
)
538+
search = search.filter("range", properties__datetime={"lte": datetime_search["lte"]})
539+
search = search.filter("range", properties__datetime={"gte": datetime_search["gte"]})
562540
return search
563541

564542
@staticmethod
@@ -761,11 +739,7 @@ async def execute_search(
761739
if hits and (sort_array := hits[limit - 1].get("sort")):
762740
next_token = urlsafe_b64encode(json.dumps(sort_array).encode()).decode()
763741

764-
matched = (
765-
es_response["hits"]["total"]["value"]
766-
if es_response["hits"]["total"]["relation"] == "eq"
767-
else None
768-
)
742+
matched = es_response["hits"]["total"]["value"] if es_response["hits"]["total"]["relation"] == "eq" else None
769743
if count_task.done():
770744
try:
771745
matched = count_task.result().get("count")
@@ -843,9 +817,7 @@ async def check_collection_exists(self, collection_id: str):
843817
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
844818
raise NotFoundError(f"Collection {collection_id} does not exist")
845819

846-
async def prep_create_item(
847-
self, item: Item, base_url: str, exist_ok: bool = False
848-
) -> Item:
820+
async def prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item:
849821
"""
850822
Preps an item for insertion into the database.
851823
@@ -867,15 +839,11 @@ async def prep_create_item(
867839
index=index_alias_by_collection_id(item["collection"]),
868840
id=mk_item_id(item["id"], item["collection"]),
869841
):
870-
raise ConflictError(
871-
f"Item {item['id']} in collection {item['collection']} already exists"
872-
)
842+
raise ConflictError(f"Item {item['id']} in collection {item['collection']} already exists")
873843

874844
return self.item_serializer.stac_to_db(item, base_url)
875845

876-
def sync_prep_create_item(
877-
self, item: Item, base_url: str, exist_ok: bool = False
878-
) -> Item:
846+
def sync_prep_create_item(self, item: Item, base_url: str, exist_ok: bool = False) -> Item:
879847
"""
880848
Prepare an item for insertion into the database.
881849
@@ -904,9 +872,7 @@ def sync_prep_create_item(
904872
index=index_alias_by_collection_id(collection_id),
905873
id=mk_item_id(item_id, collection_id),
906874
):
907-
raise ConflictError(
908-
f"Item {item_id} in collection {collection_id} already exists"
909-
)
875+
raise ConflictError(f"Item {item_id} in collection {collection_id} already exists")
910876

911877
return self.item_serializer.stac_to_db(item, base_url)
912878

@@ -934,9 +900,7 @@ async def create_item(self, item: Item, refresh: bool = False):
934900
)
935901

936902
if (meta := es_resp.get("meta")) and meta.get("status") == 409:
937-
raise ConflictError(
938-
f"Item {item_id} in collection {collection_id} already exists"
939-
)
903+
raise ConflictError(f"Item {item_id} in collection {collection_id} already exists")
940904

941905
async def merge_patch_item(
942906
self,
@@ -993,59 +957,63 @@ async def json_patch_item(
993957
script_operations = []
994958

995959
for operation in operations:
996-
if operation["op"] in ["add", "replace"]:
997-
if (
998-
operation["path"] == "collection"
999-
and collection_id != operation["value"]
1000-
):
1001-
await self.check_collection_exists(collection_id=operation["value"])
1002-
new_collection_id = operation["value"]
960+
if operation.path in ["collection", "id"] and operation.op in [
961+
"add",
962+
"replace",
963+
]:
964+
965+
if operation.path == "collection" and collection_id != operation.value:
966+
await self.check_collection_exists(collection_id=operation.value)
967+
new_collection_id = operation.value
1003968

1004-
if operation["path"] == "id" and item_id != operation["value"]:
1005-
new_item_id = operation["value"]
969+
if operation.path == "id" and item_id != operation.value:
970+
new_item_id = operation.value
1006971

1007972
else:
1008973
script_operations.append(operation)
1009974

1010975
script = operations_to_script(script_operations)
1011976

1012-
if not new_collection_id and not new_item_id:
977+
try:
1013978
await self.client.update(
1014979
index=index_by_collection_id(collection_id),
1015980
id=mk_item_id(item_id, collection_id),
1016981
script=script,
1017-
refresh=refresh,
982+
refresh=True,
1018983
)
1019984

985+
except exceptions.BadRequestError as exc:
986+
raise KeyError(exc.info["error"]["caused_by"]["to_string"]) from exc
987+
988+
item = await self.get_one_item(collection_id, item_id)
989+
1020990
if new_collection_id:
1021991
await self.client.reindex(
1022992
body={
1023-
"dest": {"index": f"{ITEMS_INDEX_PREFIX}{operation['value']}"},
993+
"dest": {"index": f"{ITEMS_INDEX_PREFIX}{new_collection_id}"},
1024994
"source": {
1025995
"index": f"{ITEMS_INDEX_PREFIX}{collection_id}",
1026996
"query": {"term": {"id": {"value": item_id}}},
1027997
},
1028998
"script": {
1029999
"lang": "painless",
10301000
"source": (
1031-
f"""ctx._id = ctx._id.replace('{collection_id}', '{operation["value"]}');"""
1032-
f"""ctx._source.collection = '{operation["value"]}';"""
1033-
+ script
1001+
f"""ctx._id = ctx._id.replace('{collection_id}', '{new_collection_id}');"""
1002+
f"""ctx._source.collection = '{new_collection_id}';"""
10341003
),
10351004
},
10361005
},
10371006
wait_for_completion=True,
1038-
refresh=False,
1007+
refresh=True,
10391008
)
1040-
1041-
item = await self.get_one_item(collection_id, item_id)
1009+
item["collection"] = new_collection_id
10421010

10431011
if new_item_id:
10441012
item["id"] = new_item_id
10451013
item = await self.prep_create_item(item=item, base_url=base_url)
10461014
await self.create_item(item=item, refresh=False)
10471015

1048-
if new_item_id or new_collection_id:
1016+
if new_collection_id or new_item_id:
10491017

10501018
await self.delete_item(
10511019
item_id=item_id,
@@ -1055,9 +1023,7 @@ async def json_patch_item(
10551023

10561024
return item
10571025

1058-
async def delete_item(
1059-
self, item_id: str, collection_id: str, refresh: bool = False
1060-
):
1026+
async def delete_item(self, item_id: str, collection_id: str, refresh: bool = False):
10611027
"""Delete a single item from the database.
10621028
10631029
Args:
@@ -1075,9 +1041,7 @@ async def delete_item(
10751041
refresh=refresh,
10761042
)
10771043
except exceptions.NotFoundError:
1078-
raise NotFoundError(
1079-
f"Item {item_id} in collection {collection_id} not found"
1080-
)
1044+
raise NotFoundError(f"Item {item_id} in collection {collection_id} not found")
10811045

10821046
async def create_collection(self, collection: Collection, refresh: bool = False):
10831047
"""Create a single collection in the database.
@@ -1124,17 +1088,13 @@ async def find_collection(self, collection_id: str) -> Collection:
11241088
collection as a `Collection` object. If the collection is not found, a `NotFoundError` is raised.
11251089
"""
11261090
try:
1127-
collection = await self.client.get(
1128-
index=COLLECTIONS_INDEX, id=collection_id
1129-
)
1091+
collection = await self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
11301092
except exceptions.NotFoundError:
11311093
raise NotFoundError(f"Collection {collection_id} not found")
11321094

11331095
return collection["_source"]
11341096

1135-
async def update_collection(
1136-
self, collection_id: str, collection: Collection, refresh: bool = False
1137-
):
1097+
async def update_collection(self, collection_id: str, collection: Collection, refresh: bool = False):
11381098
"""Update a collection from the database.
11391099
11401100
Args:
@@ -1254,9 +1214,7 @@ async def json_patch_collection(
12541214
if new_collection_id:
12551215
collection["id"] = new_collection_id
12561216
collection["links"] = resolve_links([], base_url)
1257-
await self.update_collection(
1258-
collection_id=collection_id, collection=collection, refresh=False
1259-
)
1217+
await self.update_collection(collection_id=collection_id, collection=collection, refresh=False)
12601218

12611219
return collection
12621220

@@ -1277,14 +1235,10 @@ async def delete_collection(self, collection_id: str, refresh: bool = False):
12771235
function also calls `delete_item_index` to delete the index for the items in the collection.
12781236
"""
12791237
await self.find_collection(collection_id=collection_id)
1280-
await self.client.delete(
1281-
index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh
1282-
)
1238+
await self.client.delete(index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh)
12831239
await delete_item_index(collection_id)
12841240

1285-
async def bulk_async(
1286-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1287-
) -> None:
1241+
async def bulk_async(self, collection_id: str, processed_items: List[Item], refresh: bool = False) -> None:
12881242
"""Perform a bulk insert of items into the database asynchronously.
12891243
12901244
Args:
@@ -1306,9 +1260,7 @@ async def bulk_async(
13061260
raise_on_error=False,
13071261
)
13081262

1309-
def bulk_sync(
1310-
self, collection_id: str, processed_items: List[Item], refresh: bool = False
1311-
) -> None:
1263+
def bulk_sync(self, collection_id: str, processed_items: List[Item], refresh: bool = False) -> None:
13121264
"""Perform a bulk insert of items into the database synchronously.
13131265
13141266
Args:

stac_fastapi/tests/api/test_api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
"POST /collections",
2929
"POST /collections/{collection_id}/items",
3030
"PUT /collections/{collection_id}",
31+
"PATCH /collections/{collection_id}",
3132
"PUT /collections/{collection_id}/items/{item_id}",
33+
"PATCH /collections/{collection_id}/items/{item_id}",
3234
"GET /aggregations",
3335
"GET /aggregate",
3436
"POST /aggregations",

0 commit comments

Comments
 (0)