11import json
2- import os
32import time
4-
53import boto3
64from opensearchpy import OpenSearch , RequestsHttpConnection , AWSV4SignerAuth
75from aws_lambda_powertools import Logger
86from aws_lambda_powertools .utilities .typing import LambdaContext
7+ from app .config .config import AWS_REGION
98
109logger = Logger (service = "createIndexFunction" )
1110
1211
1312def get_opensearch_client (endpoint ):
14- """
15- Create an OpenSearch (AOSS) client using AWS credentials.
16- Works for both AOSS and legacy OpenSearch domains by checking the endpoint.
17- """
13+ """Create an OpenSearch (AOSS) client using AWS credentials."""
1814 service = "aoss" if "aoss" in endpoint else "es"
1915 logger .debug (f"Connecting to OpenSearch service: { service } at { endpoint } " )
2016 return OpenSearch (
2117 hosts = [{"host" : endpoint , "port" : 443 }],
2218 http_auth = AWSV4SignerAuth (
2319 boto3 .Session ().get_credentials (),
24- os . getenv ( " AWS_REGION" , "eu-west-2" ) ,
20+ AWS_REGION ,
2521 service ,
2622 ),
2723 use_ssl = True ,
@@ -32,17 +28,12 @@ def get_opensearch_client(endpoint):
3228
3329
3430def wait_for_index_aoss (opensearch_client , index_name , timeout = 300 , poll_interval = 5 ):
35- """
36- Wait until the index exists in OpenSearch Serverless (AOSS).
37- AOSS does not support cluster health checks, so existence == ready.
38- """
31+ """Wait until the index exists in OpenSearch Serverless (AOSS)."""
3932 logger .info (f"Waiting for index '{ index_name } ' to be available in AOSS..." )
4033 start = time .time ()
4134 while True :
4235 try :
43- # Use .exists and then attempt to get mapping
4436 if opensearch_client .indices .exists (index = index_name ):
45- # Now check if mappings are available (index is queryable)
4637 mapping = opensearch_client .indices .get_mapping (index = index_name )
4738 if mapping and index_name in mapping :
4839 logger .info (f"Index '{ index_name } ' exists and mappings are ready." )
@@ -51,18 +42,14 @@ def wait_for_index_aoss(opensearch_client, index_name, timeout=300, poll_interva
5142 logger .info (f"Index '{ index_name } ' does not exist yet..." )
5243 except Exception as exc :
5344 logger .info (f"Still waiting for index '{ index_name } ': { exc } " )
54- # Exit on timeout to avoid infinite loop during stack failures.
5545 if time .time () - start > timeout :
5646 logger .error (f"Timed out waiting for index '{ index_name } ' to be available." )
5747 return False
5848 time .sleep (poll_interval )
5949
6050
6151def create_and_wait_for_index (client , index_name ):
62- """
63- Creates the index (if not present) and waits until it's ready for use.
64- Idempotent: Does nothing if the index is already present.
65- """
52+ """Creates the index (if not present) and waits until it's ready for use."""
6653 params = {
6754 "index" : index_name ,
6855 "body" : {
@@ -98,40 +85,32 @@ def create_and_wait_for_index(client, index_name):
9885 }
9986
10087 try :
101- # Only create if not present (safe for repeat runs/rollbacks)
10288 if not client .indices .exists (index = params ["index" ]):
10389 logger .info (f"Creating index { params ['index' ]} " )
10490 client .indices .create (index = params ["index" ], body = params ["body" ])
10591 logger .info (f"Index { params ['index' ]} creation initiated." )
10692 else :
10793 logger .info (f"Index { params ['index' ]} already exists" )
10894
109- # Wait until available for downstream resources
11095 if not wait_for_index_aoss (client , params ["index" ]):
11196 raise RuntimeError (f"Index { params ['index' ]} failed to appear in time" )
11297
11398 logger .info (f"Index { params ['index' ]} is ready and active." )
11499 except Exception as e :
115100 logger .error (f"Error creating or waiting for index: { e } " )
116- raise e # Fail stack if this fails
101+ raise e
117102
118103
119104def extract_parameters (event ):
120- """
121- Extract parameters from Lambda event, handling both:
122- - CloudFormation custom resource invocations
123- - Direct Lambda/test calls
124- """
105+ """Extract parameters from Lambda event."""
125106 if "ResourceProperties" in event :
126- # From CloudFormation custom resource
127107 properties = event ["ResourceProperties" ]
128108 return {
129109 "endpoint" : properties .get ("Endpoint" ),
130110 "index_name" : properties .get ("IndexName" ),
131111 "request_type" : event .get ("RequestType" ),
132112 }
133113 else :
134- # From direct Lambda invocation (e.g., manual test)
135114 return {
136115 "endpoint" : event .get ("Endpoint" ),
137116 "index_name" : event .get ("IndexName" ),
@@ -141,35 +120,27 @@ def extract_parameters(event):
141120
142121@logger .inject_lambda_context
143122def handler (event : dict , context : LambdaContext ) -> dict :
144- """
145- Entrypoint: create, update, or delete the OpenSearch index.
146- Invoked via CloudFormation custom resource or manually.
147- """
123+ """Entrypoint: create, update, or delete the OpenSearch index."""
148124 logger .info ("Received event" , extra = {"event" : event })
149125
150126 try :
151- # CloudFormation custom resources may pass the actual event as a JSON string in "Payload"
152127 if "Payload" in event :
153128 event = json .loads (event ["Payload" ])
154129
155- # Get parameters (handles both invocation types)
156130 params = extract_parameters (event )
157131 endpoint = params ["endpoint" ]
158132 index_name = params ["index_name" ]
159133 request_type = params ["request_type" ]
160134
161- # Sanity check required parameters
162135 if not endpoint or not index_name or not request_type :
163136 raise ValueError ("Missing required parameters: Endpoint, IndexName, or RequestType" )
164137
165138 client = get_opensearch_client (endpoint )
166139
167140 if request_type in ["Create" , "Update" ]:
168- # Idempotent: will not fail if index already exists
169141 create_and_wait_for_index (client , index_name )
170142 return {"PhysicalResourceId" : f"index-{ index_name } " , "Status" : "SUCCESS" }
171143 elif request_type == "Delete" :
172- # Clean up the index if it exists
173144 try :
174145 if client .indices .exists (index = index_name ):
175146 client .indices .delete (index = index_name )
0 commit comments