|
1 | 1 | import json |
2 | 2 | import logging |
3 | 3 | import os |
| 4 | +import time |
4 | 5 |
|
5 | 6 | import boto3 |
6 | 7 | from botocore.exceptions import NoCredentialsError |
@@ -32,6 +33,29 @@ def get_opensearch_client(endpoint): |
32 | 33 | ) |
33 | 34 |
|
34 | 35 |
|
| 36 | +def wait_for_index(opensearch_client, index_name, timeout=120, poll_interval=4): |
| 37 | + """ |
| 38 | + Waits for the OpenSearch index to exist and be at least 'yellow' health. |
| 39 | + """ |
| 40 | + logger.info(f"Polling for index '{index_name}' to exist and be ready...") |
| 41 | + start = time.time() |
| 42 | + while True: |
| 43 | + try: |
| 44 | + if opensearch_client.indices.exists(index=index_name): |
| 45 | + health = opensearch_client.cluster.health(index=index_name, wait_for_status="yellow", timeout="5s") |
| 46 | + status = health.get("status") |
| 47 | + logger.info(f"Index '{index_name}' exists, health: {status}") |
| 48 | + if status in ("yellow", "green"): |
| 49 | + return |
| 50 | + else: |
| 51 | + logger.info(f"Index '{index_name}' does not exist yet...") |
| 52 | + except Exception as exc: |
| 53 | + logger.info(f"Error checking index status: {exc}") |
| 54 | + if time.time() - start > timeout: |
| 55 | + raise TimeoutError(f"Timed out waiting for index '{index_name}' to become ready.") |
| 56 | + time.sleep(poll_interval) |
| 57 | + |
| 58 | + |
35 | 59 | def handler(event, context): |
36 | 60 | logger.info("Received event: %s", json.dumps(event, indent=2)) |
37 | 61 | print(event) |
@@ -86,14 +110,14 @@ def handler(event, context): |
86 | 110 | opensearch_client.indices.create( |
87 | 111 | index=params["index"], body=params["body"] |
88 | 112 | ) |
89 | | - # Wait for the index to be available |
90 | | - logger.info(f"Waiting for index {params['index']} to be ready") |
91 | | - opensearch_client.cluster.health(index=params["index"], wait_for_status="yellow") |
92 | | - logger.info(f"Index {params['index']} is ready") |
| 113 | + logger.info(f"Index {params['index']} creation initiated.") |
93 | 114 | else: |
94 | 115 | logger.info(f"Index {params['index']} already exists") |
| 116 | + # Wait for the index to be available and ready |
| 117 | + wait_for_index(opensearch_client, params["index"]) |
| 118 | + logger.info(f"Index {params['index']} is ready.") |
95 | 119 | except Exception as e: |
96 | | - logger.error(f"Error creating index: {e}") |
| 120 | + logger.error(f"Error creating or waiting for index: {e}") |
97 | 121 | raise e # Re-raise to fail the custom resource |
98 | 122 |
|
99 | 123 | elif event["RequestType"] == "Delete": |
|
0 commit comments