|
1 | | -import traceback |
2 | 1 | import boto3 |
3 | 2 | import time |
4 | 3 | from aws_lambda_powertools import Logger |
| 4 | +from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth |
5 | 5 |
|
6 | 6 | # Structured logger (auto-captures request ID, cold start, etc.) |
7 | 7 | logger = Logger(service="opensearch-index-waiter") |
8 | 8 |
|
9 | 9 | aoss = boto3.client("opensearchserverless") |
10 | 10 |
|
11 | 11 |
|
12 | | -@logger.inject_lambda_context(log_event=True, clear_state=True) |
13 | | -def handler(event, context): |
14 | | - request_type = event["RequestType"] |
15 | | - collection_name = event["ResourceProperties"]["CollectionName"] |
16 | | - index_name = event["ResourceProperties"]["IndexName"] |
| 12 | +def get_opensearch_client(endpoint): |
| 13 | + """ |
| 14 | + Create authenticated OpenSearch client for Serverless or managed service |
| 15 | + """ |
| 16 | + # Determine service type: AOSS (Serverless) or ES (managed) |
| 17 | + service = "aoss" if "aoss" in endpoint else "es" |
| 18 | + logger.debug("Connecting to OpenSearch service", extra={"service": service, "endpoint": endpoint}) |
| 19 | + return OpenSearch( |
| 20 | + hosts=[{"host": endpoint, "port": 443}], |
| 21 | + http_auth=AWSV4SignerAuth( |
| 22 | + boto3.Session().get_credentials(), |
| 23 | + AWS_REGION, |
| 24 | + service, |
| 25 | + ), |
| 26 | + use_ssl=True, |
| 27 | + verify_certs=True, |
| 28 | + connection_class=RequestsHttpConnection, |
| 29 | + pool_maxsize=10, |
| 30 | + ) |
17 | 31 |
|
18 | | - if request_type == "Delete": |
19 | | - logger.info("Delete event - no action required", extra={"collection": collection_name, "index": index_name}) |
20 | | - return {"PhysicalResourceId": f"{collection_name}/{index_name}", "Data": {"Status": "DELETED"}} |
21 | | - |
22 | | - # Poll until both collection + index become ACTIVE |
23 | | - for attempt in range(60): # up to ~10 minutes |
24 | | - # 1. Check collection |
25 | | - try: |
26 | | - coll_resp = aoss.batch_get_collection(names=[collection_name]) |
27 | | - coll = next((c for c in coll_resp.get("collectionDetails", []) if c["name"] == collection_name), None) |
28 | 32 |
|
29 | | - if not coll: |
30 | | - logger.warning("Collection missing", extra={"collection": collection_name, "attempt": attempt}) |
31 | | - time.sleep(10) |
32 | | - continue |
| 33 | +def wait_for_index_aoss(opensearch_client, index_name, timeout=300, poll_interval=5): |
| 34 | + """ |
| 35 | + Wait for index to become available in OpenSearch Serverless |
33 | 36 |
|
34 | | - logger.info("Collection status check", extra={"collection": collection_name, "status": coll["status"]}) |
35 | | - if coll["status"] != "ACTIVE": |
36 | | - time.sleep(10) |
37 | | - continue |
| 37 | + AOSS has eventual consistency, so we need to poll until the index |
| 38 | + is fully created and mappings are available. |
| 39 | + """ |
| 40 | + logger.info("Waiting for index to be available in AOSS", extra={"index_name": index_name}) |
| 41 | + start = time.time() |
| 42 | + while True: |
| 43 | + try: |
| 44 | + if opensearch_client.indices.exists(index=index_name): |
| 45 | + # Verify mappings are also available (not just index existence) |
| 46 | + mapping = opensearch_client.indices.get_mapping(index=index_name) |
| 47 | + if mapping and index_name in mapping: |
| 48 | + logger.info("Index exists and mappings are ready", extra={"index_name": index_name}) |
| 49 | + return True |
| 50 | + else: |
| 51 | + logger.info("Index does not exist yet", extra={"index_name": index_name}) |
| 52 | + except Exception as exc: |
| 53 | + logger.info("Still waiting for index", extra={"index_name": index_name, "error": str(exc)}) |
| 54 | + if time.time() - start > timeout: |
| 55 | + logger.error("Timed out waiting for index to be available", extra={"index_name": index_name}) |
| 56 | + return False |
| 57 | + time.sleep(poll_interval) |
38 | 58 |
|
39 | | - # 2. Check index |
40 | | - try: |
41 | | - aoss.get_index(indexName=index_name, id=coll["id"]) |
42 | | - except aoss.exceptions.ResourceNotFoundException: |
43 | | - logger.warning( |
44 | | - "Index missing", extra={"collection": collection_name, "index": index_name, "attempt": attempt} |
45 | | - ) |
46 | | - time.sleep(10) |
47 | | - continue |
48 | | - except Exception: |
49 | | - raise |
50 | 59 |
|
51 | | - logger.info( |
52 | | - "Index status check", |
53 | | - extra={"collection": collection_name, "index": index_name, "status": idx["status"]}, |
54 | | - ) |
55 | | - if idx["status"] == "ACTIVE": |
56 | | - logger.info("Index is ready ✅", extra={"collection": collection_name, "index": index_name}) |
57 | | - return {"PhysicalResourceId": f"{collection_name}/{index_name}", "Data": {"Status": "READY"}} |
| 60 | +@logger.inject_lambda_context(log_event=True, clear_state=True) |
| 61 | +def handler(event, context): |
| 62 | + request_type = event["RequestType"] |
| 63 | + endpoint = event["ResourceProperties"]["Endpoint"] |
| 64 | + index_name = event["ResourceProperties"]["IndexName"] |
58 | 65 |
|
59 | | - time.sleep(10) |
60 | | - except Exception: |
61 | | - logger.error("Error creating or waiting for index", extra={"error": traceback.format_exc()}) |
62 | | - return {"PhysicalResourceId": f"{collection_name}/{index_name}", "Data": {"Status": "READY"}} |
| 66 | + if request_type == "Delete": |
| 67 | + logger.info("Delete event - no action required", extra={"endpoint": endpoint, "index": index_name}) |
| 68 | + return {"PhysicalResourceId": f"index-{index_name}", "Data": {"Status": "DELETED"}} |
63 | 69 |
|
64 | | - logger.error("Timeout waiting for index readiness", extra={"collection": collection_name, "index": index_name}) |
65 | | - raise Exception(f"Collection {collection_name} / Index {index_name} not ready after timeout") |
| 70 | + client = get_opensearch_client(endpoint) |
| 71 | + if not wait_for_index_aoss(client, params["index"]): |
| 72 | + raise RuntimeError(f"Index {params['index']} failed to appear in time") |
| 73 | + return { |
| 74 | + "PhysicalResourceId": event.get("PhysicalResourceId", f"index-{index_name}"), |
| 75 | + "Status": "SUCCESS", |
| 76 | + } |
0 commit comments