Skip to content

Commit 874dc3d

Browse files
committed
Adding flag for asset indexing.
Adding scripts for reindexing existing indexes. Adding tests for serializers.
1 parent 57a9981 commit 874dc3d

File tree

5 files changed

+227
-12
lines changed

5 files changed

+227
-12
lines changed

scripts/reindex_elasticsearch.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import time
2+
3+
from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings
4+
from stac_fastapi.elasticsearch.database_logic import create_index_templates
5+
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX
6+
7+
8+
async def reindex():
9+
"""Reindex all STAC indexes for mapping update"""
10+
client = AsyncElasticsearchSettings().create_client
11+
12+
indexes = await client.indices.get_alias(name=COLLECTIONS_INDEX)
13+
indexes.update(await client.indices.get_alias(name=f"{ITEMS_INDEX_PREFIX}*"))
14+
15+
create_index_templates()
16+
17+
for index, aliases in indexes.items():
18+
name, version = index.rsplit("-", 1)
19+
new_index = f"{name}-{str(int(version) + 1).zfill(6)}"
20+
await client.options(ignore_status=400).indices.create(index=new_index)
21+
22+
reindex_resp = await client.reindex(
23+
dest={"index": new_index}, source={"index": [index]}, wait_for_completion=False
24+
)
25+
26+
task_id = reindex_resp["task"]
27+
28+
old_count = await client.count(index=index)
29+
reindex_complete = False
30+
while not reindex_complete:
31+
task_resp = await client.tasks.get(task_id=task_id)
32+
33+
if "completed" in task_resp and task_resp["completed"]:
34+
new_count = await client.count(index=new_index)
35+
36+
if new_count["count"] == old_count["count"]:
37+
reindex_complete = True
38+
39+
else:
40+
print(f"Reindex failed for {index} with error: mismatch count")
41+
42+
elif "error" in task_resp:
43+
reindex_complete = True
44+
print(f"Reindex failed for {index} with error: {task_resp['error']}")
45+
46+
time.sleep(60)
47+
48+
actions = []
49+
for alias in aliases["aliases"]:
50+
actions.extend(
51+
[
52+
{"add": {"index": new_index, "alias": alias}},
53+
{"remove": {"index": index, "alias": alias}},
54+
]
55+
)
56+
57+
await client.indices.update_aliases(actions=actions)
58+
59+
60+
if __name__ == "__main__":
61+
reindex()

scripts/reindex_opensearch.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import time
2+
3+
from stac_fastapi.opensearch.config import AsyncOpensearchSettings
4+
from stac_fastapi.opensearch.database_logic import create_index_templates
5+
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX
6+
7+
8+
async def reindex():
9+
"""Reindex all STAC indexes for mapping update"""
10+
client = AsyncOpensearchSettings().create_client
11+
12+
indexes = await client.indices.get_alias(name=COLLECTIONS_INDEX)
13+
indexes.update(await client.indices.get_alias(name=f"{ITEMS_INDEX_PREFIX}*"))
14+
15+
create_index_templates()
16+
17+
for index, aliases in indexes.items():
18+
name, version = index.rsplit("-", 1)
19+
new_index = f"{name}-{str(int(version) + 1).zfill(6)}"
20+
await client.options(ignore_status=400).indices.create(index=new_index)
21+
22+
reindex_resp = await client.reindex(
23+
dest={"index": new_index}, source={"index": [index]}, wait_for_completion=False
24+
)
25+
26+
task_id = reindex_resp["task"]
27+
28+
old_count = await client.count(index=index)
29+
reindex_complete = False
30+
while not reindex_complete:
31+
task_resp = await client.tasks.get(task_id=task_id)
32+
33+
if "completed" in task_resp and task_resp["completed"]:
34+
new_count = await client.count(index=new_index)
35+
36+
if new_count["count"] == old_count["count"]:
37+
reindex_complete = True
38+
39+
else:
40+
print(f"Reindex failed for {index} with error: mismatch count")
41+
42+
elif "error" in task_resp:
43+
reindex_complete = True
44+
print(f"Reindex failed for {index} with error: {task_resp['error']}")
45+
46+
time.sleep(60)
47+
48+
actions = []
49+
for alias in aliases["aliases"]:
50+
actions.extend(
51+
[
52+
{"add": {"index": new_index, "alias": alias}},
53+
{"remove": {"index": index, "alias": alias}},
54+
]
55+
)
56+
57+
await client.indices.update_aliases(actions=actions)
58+
59+
60+
if __name__ == "__main__":
61+
reindex()

stac_fastapi/core/stac_fastapi/core/serializers.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from stac_fastapi.core.datetime_utils import now_to_rfc3339_str
1111
from stac_fastapi.core.models.links import CollectionLinks
12+
from stac_fastapi.core.utilities import get_bool_env
1213
from stac_fastapi.types import stac as stac_types
1314
from stac_fastapi.types.links import ItemLinks, resolve_links
1415

@@ -66,9 +67,10 @@ def stac_to_db(cls, stac_data: stac_types.Item, base_url: str) -> stac_types.Ite
6667
item_links = resolve_links(stac_data.get("links", []), base_url)
6768
stac_data["links"] = item_links
6869

69-
stac_data["assets"] = [
70-
{"es_key": k, **v} for k, v in stac_data.get("assets", {}).items()
71-
]
70+
if get_bool_env("STAC_INDEX_ASSETS"):
71+
stac_data["assets"] = [
72+
{"es_key": k, **v} for k, v in stac_data.get("assets", {}).items()
73+
]
7274

7375
now = now_to_rfc3339_str()
7476
if "created" not in stac_data["properties"]:
@@ -97,6 +99,12 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item:
9799
if original_links:
98100
item_links += resolve_links(original_links, base_url)
99101

102+
if get_bool_env("STAC_INDEX_ASSETS"):
103+
assets = {a.pop("es_key"): a for a in item.get("assets", [])}
104+
105+
else:
106+
assets = item.get("assets", {})
107+
100108
return stac_types.Item(
101109
type="Feature",
102110
stac_version=item.get("stac_version", ""),
@@ -107,7 +115,7 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item:
107115
bbox=item.get("bbox", []),
108116
properties=item.get("properties", {}),
109117
links=item_links,
110-
assets={a.pop("es_key"): a for a in item.get("assets", [])},
118+
assets=assets,
111119
)
112120

113121

@@ -132,9 +140,12 @@ def stac_to_db(
132140
collection["links"] = resolve_links(
133141
collection.get("links", []), str(request.base_url)
134142
)
135-
collection["assets"] = [
136-
{"es_key": k, **v} for k, v in collection.get("assets", {}).items()
137-
]
143+
144+
if get_bool_env("STAC_INDEX_ASSETS"):
145+
collection["assets"] = [
146+
{"es_key": k, **v} for k, v in collection.get("assets", {}).items()
147+
]
148+
138149
return collection
139150

140151
@classmethod
@@ -181,9 +192,13 @@ def db_to_stac(
181192
collection_links += resolve_links(original_links, str(request.base_url))
182193
collection["links"] = collection_links
183194

184-
collection["assets"] = {
185-
a.pop("es_key"): a for a in collection.get("assets", [])
186-
}
195+
if get_bool_env("STAC_INDEX_ASSETS"):
196+
collection["assets"] = {
197+
a.pop("es_key"): a for a in collection.get("assets", [])
198+
}
199+
200+
else:
201+
collection["assets"] = collection.get("assets", {})
187202

188203
# Return the stac_types.Collection object
189204
return stac_types.Collection(**collection)

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import os
2929
from typing import Any, Dict, Literal, Protocol
3030

31+
from stac_fastapi.core.utilities import get_bool_env
32+
3133

3234
# stac_pydantic classes extend _GeometryBase, which doesn't have a type field,
3335
# So create our own Protocol for typing
@@ -134,7 +136,7 @@ class Geometry(Protocol): # noqa
134136
"id": {"type": "keyword"},
135137
"collection": {"type": "keyword"},
136138
"geometry": {"type": "geo_shape"},
137-
"assets": {"type": "object"},
139+
"assets": {"type": "object", "enabled": get_bool_env("STAC_INDEX_ASSETS")},
138140
"links": {"type": "object", "enabled": False},
139141
"properties": {
140142
"type": "object",
@@ -162,7 +164,7 @@ class Geometry(Protocol): # noqa
162164
"extent.temporal.interval": {"type": "date"},
163165
"providers": {"type": "object", "enabled": False},
164166
"links": {"type": "object", "enabled": False},
165-
"item_assets": {"type": "object", "enabled": False},
167+
"item_assets": {"type": "object", "enabled": get_bool_env("STAC_INDEX_ASSETS")},
166168
},
167169
}
168170

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import pytest
2+
3+
from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer
4+
from stac_fastapi.types import stac as stac_types
5+
6+
from ..conftest import MockRequest
7+
8+
9+
@pytest.mark.asyncio
10+
async def test_item_serializer(monkeypatch, load_test_data):
11+
monkeypatch.setenv("STAC_INDEX_ASSETS", "false")
12+
13+
request = MockRequest
14+
15+
item_data = load_test_data("test_item.json")
16+
17+
item = stac_types.Item(**item_data)
18+
19+
serialized_item = ItemSerializer.stac_to_db(stac_data=item, base_url=str(request.base_url))
20+
21+
unserialized_item = ItemSerializer.db_to_stac(item=serialized_item, base_url=str(request.base_url))
22+
23+
assert unserialized_item == item
24+
25+
26+
@pytest.mark.asyncio
27+
async def test_item_serializer_with_asset_indexing(monkeypatch, load_test_data):
28+
monkeypatch.setenv("STAC_INDEX_ASSETS", "true")
29+
30+
request = MockRequest
31+
32+
item_data = load_test_data("test_item.json")
33+
34+
item = stac_types.Item(**item_data)
35+
36+
serialized_item = ItemSerializer.stac_to_db(stac_data=item, base_url=str(request.base_url))
37+
38+
unserialized_item = ItemSerializer.db_to_stac(item=serialized_item, base_url=str(request.base_url))
39+
40+
assert unserialized_item == item
41+
42+
43+
@pytest.mark.asyncio
44+
async def test_collection_serializer(monkeypatch, load_test_data):
45+
monkeypatch.setenv("STAC_INDEX_ASSETS", "false")
46+
47+
request = MockRequest
48+
collection_data = load_test_data("test_collection.json")
49+
50+
collection = stac_types.Collection(**collection_data)
51+
52+
serialized_collection = CollectionSerializer.stac_to_db(collection=collection, request=request)
53+
54+
unserialized_collection = CollectionSerializer.db_to_stac(
55+
collection=serialized_collection, request=request, extensions=serialized_collection["stac_extensions"]
56+
)
57+
58+
assert unserialized_collection == collection
59+
60+
61+
@pytest.mark.asyncio
62+
async def test_collection_serializer_with_asset_indexing(monkeypatch, load_test_data):
63+
monkeypatch.setenv("STAC_INDEX_ASSETS", "true")
64+
65+
request = MockRequest
66+
collection_data = load_test_data("test_collection.json")
67+
68+
collection = stac_types.Collection(**collection_data)
69+
70+
serialized_collection = CollectionSerializer.stac_to_db(collection=collection, request=request)
71+
72+
unserialized_collection = CollectionSerializer.db_to_stac(
73+
collection=serialized_collection, request=request, extensions=serialized_collection["stac_extensions"]
74+
)
75+
76+
assert unserialized_collection == collection

0 commit comments

Comments
 (0)