Skip to content

Commit fc1a2cb

Browse files
committed
Update reindexing scripts.
1 parent 2a542e6 commit fc1a2cb

File tree

2 files changed

+118
-78
lines changed

2 files changed

+118
-78
lines changed

scripts/reindex_elasticsearch.py

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,84 @@
1+
import asyncio
12
import time
23

34
from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings
45
from stac_fastapi.elasticsearch.database_logic import create_index_templates
56
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX
67

78

8-
async def reindex():
9-
"""Reindex all STAC indexes for mapping update"""
10-
client = AsyncElasticsearchSettings().create_client
9+
async def reindex(client, index, new_index, aliases):
10+
"""Reindex STAC index"""
11+
print(f"reindexing {index} to {new_index}")
12+
13+
await client.options(ignore_status=400).indices.create(index=new_index)
14+
15+
reindex_resp = await client.reindex(
16+
dest={"index": new_index},
17+
source={"index": [index]},
18+
wait_for_completion=False,
19+
script={
20+
"source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}",
21+
"lang": "painless",
22+
},
23+
)
1124

12-
indexes = await client.indices.get_alias(name=COLLECTIONS_INDEX)
13-
indexes.update(await client.indices.get_alias(name=f"{ITEMS_INDEX_PREFIX}*"))
25+
task_id = reindex_resp["task"]
1426

15-
create_index_templates()
27+
reindex_complete = False
28+
while not reindex_complete:
29+
task_resp = await client.tasks.get(task_id=task_id)
1630

17-
for index, aliases in indexes.items():
18-
name, version = index.rsplit("-", 1)
19-
new_index = f"{name}-{str(int(version) + 1).zfill(6)}"
20-
await client.options(ignore_status=400).indices.create(index=new_index)
31+
if "completed" in task_resp and task_resp["completed"]:
32+
reindex_complete = True
33+
34+
elif "error" in task_resp:
35+
reindex_complete = True
36+
print(f"Reindex failed for {index} with error: {task_resp['error']}")
37+
38+
else:
39+
time.sleep(60)
2140

22-
reindex_resp = await client.reindex(
23-
dest={"index": new_index},
24-
source={"index": [index]},
25-
wait_for_completion=False,
41+
actions = []
42+
for alias in aliases["aliases"]:
43+
actions.extend(
44+
[
45+
{"add": {"index": new_index, "alias": alias}},
46+
{"remove": {"index": index, "alias": alias}},
47+
]
2648
)
2749

28-
task_id = reindex_resp["task"]
50+
await client.indices.update_aliases(actions=actions)
2951

30-
old_count = await client.count(index=index)
31-
reindex_complete = False
32-
while not reindex_complete:
33-
task_resp = await client.tasks.get(task_id=task_id)
3452

35-
if "completed" in task_resp and task_resp["completed"]:
36-
new_count = await client.count(index=new_index)
53+
async def run():
54+
"""Reindex all STAC indexes for mapping update"""
55+
client = AsyncElasticsearchSettings().create_client
3756

38-
if new_count["count"] == old_count["count"]:
39-
reindex_complete = True
57+
await create_index_templates()
4058

41-
else:
42-
print(f"Reindex failed for {index} with error: mismatch count")
59+
collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX)
60+
collections = await client.search(index=COLLECTIONS_INDEX)
4361

44-
elif "error" in task_resp:
45-
reindex_complete = True
46-
print(f"Reindex failed for {index} with error: {task_resp['error']}")
62+
collection_index, collection_aliases = next(iter(collection_response.items()))
63+
collection_index_name, version = collection_index.rsplit("-", 1)
64+
new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}"
4765

48-
time.sleep(60)
66+
await reindex(client, collection_index, new_collection_index, collection_aliases)
67+
68+
for collection in collections["hits"]["hits"]:
69+
70+
item_indexes = await client.indices.get_alias(
71+
name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*"
72+
)
73+
74+
for item_index, aliases in item_indexes.items():
75+
item_index_name, version = item_index.rsplit("-", 1)
76+
new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}"
4977

50-
actions = []
51-
for alias in aliases["aliases"]:
52-
actions.extend(
53-
[
54-
{"add": {"index": new_index, "alias": alias}},
55-
{"remove": {"index": index, "alias": alias}},
56-
]
57-
)
78+
await reindex(client, item_index, new_item_index, aliases)
5879

59-
await client.indices.update_aliases(actions=actions)
80+
await client.close()
6081

6182

6283
if __name__ == "__main__":
63-
reindex()
84+
asyncio.run(run())

scripts/reindex_opensearch.py

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,82 @@
1+
import asyncio
12
import time
23

34
from stac_fastapi.opensearch.config import AsyncOpensearchSettings
45
from stac_fastapi.opensearch.database_logic import create_index_templates
56
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX
67

78

8-
async def reindex():
9-
"""Reindex all STAC indexes for mapping update"""
10-
client = AsyncOpensearchSettings().create_client
9+
async def reindex(client, index, new_index, aliases):
10+
"""Reindex STAC index"""
11+
print(f"reindexing {index} to {new_index}")
12+
13+
await client.options(ignore_status=400).indices.create(index=new_index)
14+
15+
reindex_resp = await client.reindex(
16+
dest={"index": new_index},
17+
source={"index": [index]},
18+
wait_for_completion=False,
19+
script={
20+
"source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}",
21+
"lang": "painless",
22+
},
23+
)
1124

12-
indexes = await client.indices.get_alias(name=COLLECTIONS_INDEX)
13-
indexes.update(await client.indices.get_alias(name=f"{ITEMS_INDEX_PREFIX}*"))
25+
task_id = reindex_resp["task"]
1426

15-
create_index_templates()
27+
reindex_complete = False
28+
while not reindex_complete:
29+
task_resp = await client.tasks.get(task_id=task_id)
1630

17-
for index, aliases in indexes.items():
18-
name, version = index.rsplit("-", 1)
19-
new_index = f"{name}-{str(int(version) + 1).zfill(6)}"
20-
await client.options(ignore_status=400).indices.create(index=new_index)
31+
if "completed" in task_resp and task_resp["completed"]:
32+
reindex_complete = True
33+
34+
elif "error" in task_resp:
35+
reindex_complete = True
36+
print(f"Reindex failed for {index} with error: {task_resp['error']}")
37+
38+
else:
39+
time.sleep(60)
2140

22-
reindex_resp = await client.reindex(
23-
dest={"index": new_index},
24-
source={"index": [index]},
25-
wait_for_completion=False,
41+
actions = []
42+
for alias in aliases["aliases"]:
43+
actions.extend(
44+
[
45+
{"add": {"index": new_index, "alias": alias}},
46+
{"remove": {"index": index, "alias": alias}},
47+
]
2648
)
2749

28-
task_id = reindex_resp["task"]
50+
await client.indices.update_aliases(actions=actions)
2951

30-
old_count = await client.count(index=index)
31-
reindex_complete = False
32-
while not reindex_complete:
33-
task_resp = await client.tasks.get(task_id=task_id)
3452

35-
if "completed" in task_resp and task_resp["completed"]:
36-
new_count = await client.count(index=new_index)
53+
async def run():
54+
"""Reindex all STAC indexes for mapping update"""
55+
client = AsyncOpensearchSettings().create_client
3756

38-
if new_count["count"] == old_count["count"]:
39-
reindex_complete = True
57+
await create_index_templates()
4058

41-
else:
42-
print(f"Reindex failed for {index} with error: mismatch count")
59+
collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX)
60+
collections = await client.search(index=COLLECTIONS_INDEX)
4361

44-
elif "error" in task_resp:
45-
reindex_complete = True
46-
print(f"Reindex failed for {index} with error: {task_resp['error']}")
62+
collection_index, collection_aliases = next(iter(collection_response.items()))
63+
collection_index_name, version = collection_index.rsplit("-", 1)
64+
new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}"
4765

48-
time.sleep(60)
66+
await reindex(client, collection_index, new_collection_index, collection_aliases)
67+
68+
for collection in collections["hits"]["hits"]:
69+
70+
item_indexes = await client.indices.get_alias(name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*")
71+
72+
for item_index, aliases in item_indexes.items():
73+
item_index_name, version = item_index.rsplit("-", 1)
74+
new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}"
4975

50-
actions = []
51-
for alias in aliases["aliases"]:
52-
actions.extend(
53-
[
54-
{"add": {"index": new_index, "alias": alias}},
55-
{"remove": {"index": index, "alias": alias}},
56-
]
57-
)
76+
await reindex(client, item_index, new_item_index, aliases)
5877

59-
await client.indices.update_aliases(actions=actions)
78+
await client.close()
6079

6180

6281
if __name__ == "__main__":
63-
reindex()
82+
asyncio.run(run())

0 commit comments

Comments
 (0)