From 2aceabd19fbf045d07d836bc45b9465daf17a5a5 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sun, 12 Oct 2025 15:28:29 +0800 Subject: [PATCH 1/5] reindex scratch --- scripts/reindex_elasticsearch.py | 84 -------------------- scripts/reindex_opensearch.py | 84 -------------------- sfeos_tools/sfeos_tools/cli.py | 108 +++++++++++++++++++++++++ sfeos_tools/sfeos_tools/reindex.py | 122 +++++++++++++++++++++++++++++ 4 files changed, 230 insertions(+), 168 deletions(-) delete mode 100644 scripts/reindex_elasticsearch.py delete mode 100644 scripts/reindex_opensearch.py create mode 100644 sfeos_tools/sfeos_tools/reindex.py diff --git a/scripts/reindex_elasticsearch.py b/scripts/reindex_elasticsearch.py deleted file mode 100644 index 634da7e4..00000000 --- a/scripts/reindex_elasticsearch.py +++ /dev/null @@ -1,84 +0,0 @@ -import asyncio -import time - -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings -from stac_fastapi.elasticsearch.database_logic import create_index_templates -from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX - - -async def reindex(client, index, new_index, aliases): - """Reindex STAC index""" - print(f"reindexing {index} to {new_index}") - - await client.options(ignore_status=400).indices.create(index=new_index) - - reindex_resp = await client.reindex( - dest={"index": new_index}, - source={"index": [index]}, - wait_for_completion=False, - script={ - "source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}", - "lang": "painless", - }, - ) - - task_id = reindex_resp["task"] - - reindex_complete = False - while not reindex_complete: - task_resp = await client.tasks.get(task_id=task_id) - - if "completed" in task_resp and task_resp["completed"]: - reindex_complete = True - - elif "error" in task_resp: - reindex_complete = True - print(f"Reindex failed for {index} with error: {task_resp['error']}") - - else: - time.sleep(60) - - actions = [] - for alias in aliases["aliases"]: - actions.extend( - [ - {"add": {"index": new_index, "alias": alias}}, - {"remove": {"index": index, "alias": alias}}, - ] - ) - - await client.indices.update_aliases(actions=actions) - - -async def run(): - """Reindex all STAC indexes for mapping update""" - client = AsyncElasticsearchSettings().create_client - - await create_index_templates() - - collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX) - collections = await client.search(index=COLLECTIONS_INDEX) - - collection_index, collection_aliases = next(iter(collection_response.items())) - collection_index_name, version = collection_index.rsplit("-", 1) - new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}" - - await reindex(client, collection_index, new_collection_index, collection_aliases) - - for collection in collections["hits"]["hits"]: - - item_indexes = await client.indices.get_alias( - name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*" - ) - - for item_index, aliases in item_indexes.items(): - item_index_name, version = item_index.rsplit("-", 1) - new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}" - - await reindex(client, item_index, new_item_index, aliases) - - await client.close() - - -if __name__ == "__main__": - asyncio.run(run()) diff --git a/scripts/reindex_opensearch.py b/scripts/reindex_opensearch.py deleted file mode 100644 index 7c760204..00000000 --- a/scripts/reindex_opensearch.py +++ /dev/null @@ -1,84 +0,0 @@ -import asyncio -import time - -from stac_fastapi.opensearch.config import AsyncOpensearchSettings -from stac_fastapi.opensearch.database_logic import create_index_templates -from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX - - -async def reindex(client, index, new_index, aliases): - """Reindex STAC index""" - print(f"reindexing {index} to {new_index}") - - await client.options(ignore_status=400).indices.create(index=new_index) - - reindex_resp = await client.reindex( - dest={"index": new_index}, - source={"index": [index]}, - wait_for_completion=False, - script={ - "source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}", - "lang": "painless", - }, - ) - - task_id = reindex_resp["task"] - - reindex_complete = False - while not reindex_complete: - task_resp = await client.tasks.get(task_id=task_id) - - if "completed" in task_resp and task_resp["completed"]: - reindex_complete = True - - elif "error" in task_resp: - reindex_complete = True - print(f"Reindex failed for {index} with error: {task_resp['error']}") - - else: - time.sleep(60) - - actions = [] - for alias in aliases["aliases"]: - actions.extend( - [ - {"add": {"index": new_index, "alias": alias}}, - {"remove": {"index": index, "alias": alias}}, - ] - ) - - await client.indices.update_aliases(actions=actions) - - -async def run(): - """Reindex all STAC indexes for mapping update""" - client = AsyncOpensearchSettings().create_client - - await create_index_templates() - - collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX) - collections = await client.search(index=COLLECTIONS_INDEX) - - collection_index, collection_aliases = next(iter(collection_response.items())) - collection_index_name, version = collection_index.rsplit("-", 1) - new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}" - - await reindex(client, collection_index, new_collection_index, collection_aliases) - - for collection in collections["hits"]["hits"]: - - item_indexes = await client.indices.get_alias( - name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*" - ) - - for item_index, aliases in item_indexes.items(): - item_index_name, version = item_index.rsplit("-", 1) - new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}" - - await reindex(client, item_index, new_item_index, aliases) - - await client.close() - - -if __name__ == "__main__": - asyncio.run(run()) diff --git a/sfeos_tools/sfeos_tools/cli.py b/sfeos_tools/sfeos_tools/cli.py index 827e8116..1cbefad4 100644 --- a/sfeos_tools/sfeos_tools/cli.py +++ b/sfeos_tools/sfeos_tools/cli.py @@ -17,6 +17,8 @@ from stac_fastapi.sfeos_helpers.database import add_bbox_shape_to_collection from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX +from .reindex import run as unified_reindex_run + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -225,5 +227,111 @@ def add_bbox_shape(backend, host, port, use_ssl, user, password): sys.exit(1) +@cli.command("reindex") +@click.option( + "--backend", + type=click.Choice(["elasticsearch", "opensearch"], case_sensitive=False), + required=True, + help="Database backend to use", +) +@click.option( + "--host", + type=str, + default=None, + help="Database host (default: localhost or ES_HOST env var)", +) +@click.option( + "--port", + type=int, + default=None, + help="Database port (default: 9200 for ES, 9202 for OS, or ES_PORT env var)", +) +@click.option( + "--use-ssl/--no-ssl", + default=None, + help="Use SSL connection (default: true or ES_USE_SSL env var)", +) +@click.option( + "--user", + type=str, + default=None, + help="Database username (default: ES_USER env var)", +) +@click.option( + "--password", + type=str, + default=None, + help="Database password (default: ES_PASS env var)", +) +@click.option( + "--yes", + is_flag=True, + help="Skip confirmation prompt", +) +def reindex(backend, host, port, use_ssl, user, password, yes): + """Reindex all STAC indexes to the next version and update aliases. + + For Elasticsearch, this runs a migration that: + - Creates/updates index templates + - Reindexes collections and item indexes to a new version + - Applies asset migration script for compatibility + - Switches aliases to the new indexes + """ + import os + + backend = backend.lower() + + if not yes: + proceed = click.confirm( + "This will reindex all collections and item indexes and update aliases. Proceed?", + default=False, + ) + if not proceed: + click.echo(click.style("Aborted", fg="yellow")) + return + + # Set environment variables from CLI options if provided + if host: + os.environ["ES_HOST"] = host + if port: + os.environ["ES_PORT"] = str(port) + if use_ssl is not None: + os.environ["ES_USE_SSL"] = "true" if use_ssl else "false" + if user: + os.environ["ES_USER"] = user + if password: + os.environ["ES_PASS"] = password + + try: + asyncio.run(unified_reindex_run(backend)) + click.echo( + click.style( + f"āœ“ Reindex ({backend.title()}) completed successfully", fg="green" + ) + ) + except KeyboardInterrupt: + click.echo(click.style("\nāœ— Reindex interrupted by user", fg="yellow")) + sys.exit(1) + except Exception as e: + error_msg = str(e) + click.echo(click.style(f"āœ— Reindex failed: {error_msg}", fg="red")) + # Provide helpful hints for common errors + if "TLS" in error_msg or "SSL" in error_msg: + click.echo( + click.style( + "\nšŸ’” Hint: If you're connecting to a local Docker Compose instance, try adding --no-ssl flag", + fg="yellow", + ) + ) + elif "Connection refused" in error_msg: + click.echo( + click.style( + "\nšŸ’” Hint: Make sure your database is running and accessible at the specified host:port", + fg="yellow", + ) + ) + sys.exit(1) + + if __name__ == "__main__": cli() diff --git a/sfeos_tools/sfeos_tools/reindex.py b/sfeos_tools/sfeos_tools/reindex.py new file mode 100644 index 00000000..5455a14e --- /dev/null +++ b/sfeos_tools/sfeos_tools/reindex.py @@ -0,0 +1,122 @@ +"""Reindex ES/OS database for mapping update.""" + +import asyncio +import time +from typing import Any, Dict + +from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX + + +async def _reindex_single_index( + client, index: str, new_index: str, aliases: Dict[str, Any] +): + """Reindex a single index to a new version and switch aliases.""" + print(f"reindexing {index} to {new_index}") + + await client.options(ignore_status=400).indices.create(index=new_index) + + # Asset migration script for data compatibility + script = { + "source": ( + "if (ctx._source.containsKey('assets')){List l = new ArrayList();" + "for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key];" + " item['es_key'] = key; l.add(item)}ctx._source.assets=l} " + "if (ctx._source.containsKey('item_assets')){ List a = new ArrayList();" + " for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key];" + " item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}" + ), + "lang": "painless", + } + + reindex_resp = await client.reindex( + dest={"index": new_index}, + source={"index": [index]}, + wait_for_completion=False, + script=script, + ) + + task_id = reindex_resp["task"] + + reindex_complete = False + while not reindex_complete: + task_resp = await client.tasks.get(task_id=task_id) + + if "completed" in task_resp and task_resp["completed"]: + reindex_complete = True + elif "error" in task_resp: + reindex_complete = True + print(f"Reindex failed for {index} with error: {task_resp['error']}") + else: + time.sleep(60) + + actions = [] + for alias in aliases["aliases"]: + actions.extend( + [ + {"add": {"index": new_index, "alias": alias}}, + {"remove": {"index": index, "alias": alias}}, + ] + ) + + if actions: + await client.indices.update_aliases(actions=actions) + + +async def run(backend: str = "elasticsearch"): + """Reindex all STAC indexes for mapping update for the given backend. + + backend: 'elasticsearch' or 'opensearch' + """ + backend = backend.lower() + + # Lazy imports so the package dependencies remain optional per backend + if backend == "elasticsearch": + from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings + from stac_fastapi.elasticsearch.database_logic import create_index_templates + + settings = AsyncElasticsearchSettings() + elif backend == "opensearch": + from stac_fastapi.opensearch.config import AsyncOpensearchSettings + from stac_fastapi.opensearch.database_logic import create_index_templates + + settings = AsyncOpensearchSettings() + else: + raise ValueError(f"Unsupported backend: {backend}") + + client = settings.create_client + + try: + # Ensure latest templates are applied + await create_index_templates() + + # Collections index + collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX) + collections = await client.search(index=COLLECTIONS_INDEX) + + collection_index, collection_aliases = next(iter(collection_response.items())) + collection_index_name, version = collection_index.rsplit("-", 1) + new_collection_index = ( + f"{collection_index_name}-{str(int(version) + 1).zfill(6)}" + ) + + await _reindex_single_index( + client, collection_index, new_collection_index, collection_aliases + ) + + # Items per collection + for collection in collections["hits"]["hits"]: + item_indexes = await client.indices.get_alias( + name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*" + ) + + for item_index, aliases in item_indexes.items(): + item_index_name, version = item_index.rsplit("-", 1) + new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}" + + await _reindex_single_index(client, item_index, new_item_index, aliases) + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(run("elasticsearch")) From ffd27a9568122c5dd5961a7c71735e8a2bc78290 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sun, 12 Oct 2025 16:32:56 +0800 Subject: [PATCH 2/5] update readme lastest news format --- README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 37dbdba7..65726398 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,16 @@ The following organizations have contributed time and/or funding to support the ## Latest News -
- - **10/12/2025:** Collections search **bbox** functionality added! The collections search extension now supports bbox queries. Collections will need to be updated via the API or with the new **[SFEOS-tools](#sfeos-tools-cli)** CLI package to support geospatial discoverability. Thanks again to **CloudFerro** for their sponsorship of this work! - **10/04/2025:** The **[CloudFerro](https://cloudferro.com/)** logo has been added to the sponsors and supporters list above. Their sponsorship of the ongoing collections search extension work has been invaluable. This is in addition to the many other important changes and updates their developers have added to the project. -
+
+View Older News (Click to Expand) + +- **09/25/2025:** v6.5.0 adds a new GET/POST /collections-search endpoint (disabled by default via ENABLE_COLLECTIONS_SEARCH_ROUTE) to avoid conflicts with Transactions, and enhances GET /collections with structured filtering (CQL2 JSON/text), query, and datetime filtering. These changes make collection discovery more powerful and configurable while preserving compatibility with transaction-enabled deployments. Thanks to **CloudFerro** for their sponsorship of this work! + + +
## Project Introduction - What is SFEOS? From 0736af1da2df654acc6a3d52e449b7cc2f9f1e24 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sun, 12 Oct 2025 18:32:48 +0800 Subject: [PATCH 3/5] add new line --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 65726398..6c8dafcd 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,8 @@ The following organizations have contributed time and/or funding to support the
View Older News (Click to Expand) -- **09/25/2025:** v6.5.0 adds a new GET/POST /collections-search endpoint (disabled by default via ENABLE_COLLECTIONS_SEARCH_ROUTE) to avoid conflicts with Transactions, and enhances GET /collections with structured filtering (CQL2 JSON/text), query, and datetime filtering. These changes make collection discovery more powerful and configurable while preserving compatibility with transaction-enabled deployments. Thanks to **CloudFerro** for their sponsorship of this work! +

+- **09/25/2025:** v6.5.0 adds a new GET/POST /collections-search endpoint (disabled by default via ENABLE_COLLECTIONS_SEARCH_ROUTE) to avoid conflicts with the Transactions Extension, and enhances collections search with structured filtering (CQL2 JSON/text), query, and datetime filtering. These changes make collection discovery more powerful and configurable while preserving compatibility with transaction-enabled deployments. Thanks to **CloudFerro** for their sponsorship of this work!
From 5dd331004d7d1312ec1acd98a045a80a7a2d6fcc Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sun, 12 Oct 2025 18:34:41 +0800 Subject: [PATCH 4/5] ues line --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6c8dafcd..edc46c44 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ The following organizations have contributed time and/or funding to support the
View Older News (Click to Expand) -

+------------- - **09/25/2025:** v6.5.0 adds a new GET/POST /collections-search endpoint (disabled by default via ENABLE_COLLECTIONS_SEARCH_ROUTE) to avoid conflicts with the Transactions Extension, and enhances collections search with structured filtering (CQL2 JSON/text), query, and datetime filtering. These changes make collection discovery more powerful and configurable while preserving compatibility with transaction-enabled deployments. Thanks to **CloudFerro** for their sponsorship of this work! From ec0912a974b49602653f9b3d95852e4585b71c50 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sun, 12 Oct 2025 23:13:49 +0800 Subject: [PATCH 5/5] update readme, changelog --- CHANGELOG.md | 2 ++ README.md | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6323533e..7998bade 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed +- Unified elasticsearch and opensearch reindex scripts and added functionality to the new SFEOS-tools CLI package. [#490](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/490) + ### Fixed ## [v6.5.1] - 2025-09-30 diff --git a/README.md b/README.md index edc46c44..7d12ada5 100644 --- a/README.md +++ b/README.md @@ -509,11 +509,17 @@ The system uses a precise naming convention: - **Available Commands**: - `add-bbox-shape`: Add bbox_shape field to existing collections for spatial search support + - `reindex`: Reindex all STAC indices (collections and per-collection items) to new versioned indices and update aliases; supports both Elasticsearch and OpenSearch backends - **Basic Usage**: ```shell + # Add bbox_shape to collections sfeos-tools add-bbox-shape --backend elasticsearch sfeos-tools add-bbox-shape --backend opensearch + + # Reindex all STAC data + sfeos-tools reindex --backend elasticsearch --yes + sfeos-tools reindex --backend opensearch --yes ``` - **Connection Options**: Configure database connection via CLI flags or environment variables: @@ -543,6 +549,13 @@ The system uses a precise naming convention: # Using --help for more information sfeos-tools --help sfeos-tools add-bbox-shape --help + sfeos-tools reindex --help + + # Reindex with custom batch size and concurrency + sfeos-tools reindex --backend elasticsearch --batch-size 500 --concurrency 4 --yes + + # Reindex with progress updates every 1000 items + sfeos-tools reindex --backend opensearch --progress 1000 --yes ``` For more details, see the [SFEOS Tools README](./sfeos_tools/README.md).