Skip to content

Commit f0f698d

Browse files
committed
add a waiter function
1 parent 02531be commit f0f698d

File tree

5 files changed

+139
-2
lines changed

5 files changed

+139
-2
lines changed

packages/cdk/constructs/LambdaFunction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export interface LambdaFunctionProps {
2424
readonly functionName: string
2525
readonly packageBasePath: string
2626
readonly handler: string
27-
readonly environmentVariables: {[key: string]: string}
27+
readonly environmentVariables?: {[key: string]: string}
2828
readonly additionalPolicies?: Array<IManagedPolicy>
2929
readonly logRetentionInDays: number
3030
readonly logLevel: string

packages/cdk/nagSuppressions.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,30 @@ export const nagSuppressions = (stack: Stack) => {
9292
]
9393
)
9494

95+
// Suppress IAM wildcard permissions for waiter function execution role policy
96+
safeAddNagSuppression(
97+
stack,
98+
"/EpsAssistMeStack/VectorIndex/SlackBotLambda/LambdaPutLogsManagedPolicy/Resource",
99+
[
100+
{
101+
id: "AwsSolutions-IAM5",
102+
reason: "Lambda needs access to all OpenSearch collections and indexes to create and manage indexes."
103+
}
104+
]
105+
)
106+
107+
// Suppress IAM wildcard permissions for waiter on event role policy
108+
safeAddNagSuppression(
109+
stack,
110+
"/EpsAssistMeStack/VectorIndex/IndexWaiterProvider/framework-onEvent/ServiceRole/DefaultPolicy/Resource",
111+
[
112+
{
113+
id: "AwsSolutions-IAM5",
114+
reason: "Lambda needs access to all OpenSearch collections and indexes to create and manage indexes."
115+
}
116+
]
117+
)
118+
95119
// Suppress wildcard permissions for SlackBot policy
96120
safeAddNagSuppression(
97121
stack,
@@ -142,6 +166,17 @@ export const nagSuppressions = (stack: Stack) => {
142166
]
143167
)
144168

169+
safeAddNagSuppression(
170+
stack,
171+
"/EpsAssistMeStack/VectorIndex/IndexWaiterProvider/framework-onEvent/ServiceRole/Resource",
172+
[
173+
{
174+
id: "AwsSolutions-IAM4",
175+
reason: "Waiter function on event using managed policies is fine"
176+
}
177+
]
178+
)
179+
145180
// Suppress AWS managed policy usage in BucketNotificationsHandler (wildcard for any hash)
146181
const bucketNotificationHandlers = stack.node.findAll().filter(node =>
147182
node.node.id.startsWith("BucketNotificationsHandler")

packages/cdk/resources/VectorIndex.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
import {Construct} from "constructs"
22
import {CfnCollection, CfnIndex} from "aws-cdk-lib/aws-opensearchserverless"
3+
import {CustomResource} from "aws-cdk-lib"
4+
import {ManagedPolicy, PolicyStatement} from "aws-cdk-lib/aws-iam"
5+
import {Provider} from "aws-cdk-lib/custom-resources"
6+
import {LambdaFunction} from "../constructs/LambdaFunction"
37

48
export interface VectorIndexProps {
59
readonly indexName: string
610
readonly collection: CfnCollection
711
readonly endpoint: string
12+
readonly account: string
13+
readonly region: string
14+
readonly stackName: string
15+
readonly logRetentionInDays: number
16+
readonly logLevel: string
817
}
918

1019
export class VectorIndex extends Construct {
1120
public readonly cfnIndex: CfnIndex
21+
public readonly indexReady: CustomResource
1222

1323
constructor(scope: Construct, id: string, props: VectorIndexProps) {
1424
super(scope, id)
@@ -61,8 +71,56 @@ export class VectorIndex extends Construct {
6171
}
6272
})
6373

74+
const collectionArn = `arn:aws:aoss:${props.region}:${props.account}:collection/${props.collection.name}`
75+
const indexArn = `arn:aws:aoss:${props.region}:${props.account}:index/${props.collection.name}/${props.indexName}`
76+
77+
const getCollectionPolicy = new PolicyStatement({
78+
actions: [
79+
"opensearchserverless:BatchGetCollection"
80+
],
81+
resources: [collectionArn]
82+
})
83+
const getIndexPolicy = new PolicyStatement({
84+
actions: [
85+
"opensearchserverless:BatchGetIndex"
86+
],
87+
resources: [indexArn]
88+
})
89+
const waiterFnManagedPolicy = new ManagedPolicy(this, "Policy", {
90+
description: "Policy for Bedrock Knowledge Base to access S3 and OpenSearch",
91+
statements: [
92+
getCollectionPolicy,
93+
getIndexPolicy
94+
]
95+
})
96+
97+
const waiterFn = new LambdaFunction(this, "SlackBotLambda", {
98+
stackName: props.stackName,
99+
functionName: `${props.stackName}-VectorIndexWaiter`,
100+
packageBasePath: "packages/cdk/resources/lambda",
101+
handler: "index_waiter.handler",
102+
logRetentionInDays: props.logRetentionInDays,
103+
logLevel: props.logLevel,
104+
additionalPolicies: [waiterFnManagedPolicy]
105+
})
106+
107+
const provider = new Provider(this, "IndexWaiterProvider", {
108+
onEventHandler: waiterFn.function
109+
})
110+
111+
const indexReady = new CustomResource(this, "IndexReady", {
112+
serviceToken: provider.serviceToken,
113+
properties: {
114+
CollectionName: props.collection.name,
115+
IndexName: props.indexName
116+
}
117+
})
64118
// Ensure collection exists before creating index
65119
cfnIndex.node.addDependency(props.collection)
120+
indexReady.node.addDependency(props.collection)
121+
indexReady.node.addDependency(cfnIndex)
122+
66123
this.cfnIndex = cfnIndex
124+
this.indexReady = indexReady
67125
}
68126
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import boto3
2+
import time
3+
import json
4+
5+
aoss = boto3.client("opensearchserverless")
6+
7+
8+
def handler(event, context):
9+
print("Received event:", json.dumps(event))
10+
request_type = event["RequestType"]
11+
collection_name = event["ResourceProperties"]["CollectionName"]
12+
index_name = event["ResourceProperties"]["IndexName"]
13+
14+
if request_type == "Delete":
15+
return {"PhysicalResourceId": f"{collection_name}/{index_name}", "Data": {"Status": "DELETED"}}
16+
17+
# Poll until both collection + index become ACTIVE
18+
for _ in range(60): # 10 minutes max
19+
# 1. Check collection
20+
coll_resp = aoss.batch_get_collection(names=[collection_name])
21+
coll = next((c for c in coll_resp.get("collectionDetails", []) if c["name"] == collection_name), None)
22+
if not coll or coll["status"] != "ACTIVE":
23+
print(f"Collection {collection_name} status={coll['status'] if coll else 'MISSING'}")
24+
time.sleep(10)
25+
continue
26+
27+
# 2. Check index
28+
idx_resp = aoss.batch_get_index(names=[index_name], collectionName=collection_name)
29+
idx = next((i for i in idx_resp.get("indexDetails", []) if i["name"] == index_name), None)
30+
31+
if idx and idx["status"] == "ACTIVE":
32+
print(f"Index {index_name} in {collection_name} is ACTIVE")
33+
return {"PhysicalResourceId": f"{collection_name}/{index_name}", "Data": {"Status": "READY"}}
34+
35+
print(f"Index {index_name} status={idx['status'] if idx else 'MISSING'}")
36+
time.sleep(10)
37+
38+
raise Exception(f"Collection {collection_name} / Index {index_name} not ready after timeout")

packages/cdk/stacks/EpsAssistMeStack.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,17 @@ export class EpsAssistMeStack extends Stack {
137137
const vectorIndex = new VectorIndex(this, "VectorIndex", {
138138
indexName: VECTOR_INDEX_NAME,
139139
collection: openSearchResources.collection.collection,
140-
endpoint
140+
endpoint,
141+
region: this.region,
142+
account: this.account,
143+
stackName: props.stackName,
144+
logRetentionInDays: logRetentionInDays,
145+
logLevel: logLevel
141146
})
142147

143148
// Ensure knowledge base waits for vector index
144149
vectorKB.knowledgeBase.node.addDependency(vectorIndex.cfnIndex)
150+
vectorKB.knowledgeBase.node.addDependency(vectorIndex.indexReady)
145151

146152
// Add S3 notification to trigger sync Lambda function
147153
new S3LambdaNotification(this, "S3LambdaNotification", {

0 commit comments

Comments
 (0)