Skip to content

Commit 736f51b

Browse files
committed
Implement syncKnowledgeBaseFunction Lambda
1 parent cb0d679 commit 736f51b

File tree

8 files changed

+125
-10
lines changed

8 files changed

+125
-10
lines changed

.github/workflows/cdk_package_code.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ jobs:
6363
run: |
6464
pip3 install -r packages/slackBotFunction/requirements.txt -t packages/slackBotFunction
6565
pip3 install -r packages/createIndexFunction/requirements.txt -t packages/createIndexFunction
66+
pip3 install -r packages/syncKnowledgeBaseFunction/requirements.txt -t packages/syncKnowledgeBaseFunction
6667
6768
- name: 'Tar files'
6869
run: |

README.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ The solution consists of:
99

1010
- **Slack Bot Function**: AWS Lambda function that handles Slack slash commands and integrates with Amazon Bedrock Knowledge Base
1111
- **Create Index Function**: AWS Lambda function that creates and manages OpenSearch vector indices for the knowledge base
12+
- **Sync Knowledge Base Function**: AWS Lambda function that automatically triggers knowledge base ingestion when documents are uploaded to S3
1213
- **OpenSearch Serverless**: Vector database for storing and searching document embeddings
1314
- **Amazon Bedrock Knowledge Base**: RAG (Retrieval-Augmented Generation) service with guardrails
14-
- **S3 Storage**: Document storage for the knowledge base
15+
- **S3 Storage**: Document storage for the knowledge base with automatic sync triggers
1516
- **AWS CDK**: Infrastructure as Code for deployment
1617

1718
## Project Structure
@@ -20,13 +21,14 @@ This is a monorepo with the following structure:
2021

2122
```
2223
packages/
23-
├── cdk/ # AWS CDK infrastructure code
24-
│ ├── bin/ # CDK app entry point
25-
│ ├── constructs/ # Reusable CDK constructs
26-
│ ├── resources/ # AWS resource definitions
27-
│ └── stacks/ # CDK stack definitions
28-
├── createIndexFunction/ # Lambda function for OpenSearch index management
29-
└── slackBotFunction/ # Lambda function for Slack bot integration
24+
├── cdk/ # AWS CDK infrastructure code
25+
│ ├── bin/ # CDK app entry point
26+
│ ├── constructs/ # Reusable CDK constructs
27+
│ ├── resources/ # AWS resource definitions
28+
│ └── stacks/ # CDK stack definitions
29+
├── createIndexFunction/ # Lambda function for OpenSearch index management
30+
├── slackBotFunction/ # Lambda function for Slack bot integration
31+
└── syncKnowledgeBaseFunction/ # Lambda function for automatic knowledge base sync
3032
```
3133

3234
## Contributing

packages/cdk/resources/Functions.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ export interface FunctionsProps {
1818
logLevel: string
1919
createIndexManagedPolicy: ManagedPolicy
2020
slackBotManagedPolicy: ManagedPolicy
21+
syncKnowledgeBaseManagedPolicy: ManagedPolicy
2122
slackBotTokenParameter: StringParameter
2223
slackSigningSecretParameter: StringParameter
2324
guardrailId: string
2425
guardrailVersion: string
2526
collectionId: string
2627
knowledgeBaseId: string
28+
dataSourceId: string
2729
region: string
2830
account: string
2931
slackBotTokenSecret: Secret
@@ -74,9 +76,25 @@ export class Functions extends Construct {
7476
props.slackBotTokenSecret.grantRead(slackBotLambda.function)
7577
props.slackBotSigningSecret.grantRead(slackBotLambda.function)
7678

79+
// Lambda function to sync knowledge base on S3 events
80+
const syncKnowledgeBaseFunction = new LambdaFunction(this, "SyncKnowledgeBaseFunction", {
81+
stackName: props.stackName,
82+
functionName: `${props.stackName}-SyncKnowledgeBaseFunction`,
83+
packageBasePath: "packages/syncKnowledgeBaseFunction",
84+
entryPoint: "app.py",
85+
logRetentionInDays: props.logRetentionInDays,
86+
logLevel: props.logLevel,
87+
environmentVariables: {
88+
"KNOWLEDGEBASE_ID": props.knowledgeBaseId || "placeholder",
89+
"DATA_SOURCE_ID": props.dataSourceId || "placeholder"
90+
},
91+
additionalPolicies: [props.syncKnowledgeBaseManagedPolicy]
92+
})
93+
7794
this.functions = {
7895
createIndex: createIndexFunction,
79-
slackBot: slackBotLambda
96+
slackBot: slackBotLambda,
97+
syncKnowledgeBase: syncKnowledgeBaseFunction
8098
}
8199
}
82100
}

packages/cdk/resources/IamResources.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export class IamResources extends Construct {
2424
public readonly bedrockExecutionRole: Role
2525
public readonly createIndexManagedPolicy: ManagedPolicy
2626
public readonly slackBotManagedPolicy: ManagedPolicy
27+
public readonly syncKnowledgeBaseManagedPolicy: ManagedPolicy
2728

2829
constructor(scope: Construct, id: string, props: IamResourcesProps) {
2930
super(scope, id)
@@ -158,5 +159,23 @@ export class IamResources extends Construct {
158159
slackBotGuardrailPolicy
159160
]
160161
})
162+
163+
// Create managed policy for SyncKnowledgeBase Lambda function
164+
const syncKnowledgeBasePolicy = new PolicyStatement({
165+
actions: [
166+
"bedrock-agent:StartIngestionJob",
167+
"bedrock-agent:GetIngestionJob",
168+
"bedrock-agent:ListIngestionJobs"
169+
],
170+
resources: [
171+
`arn:aws:bedrock:${props.region}:${props.account}:knowledge-base/*`,
172+
`arn:aws:bedrock:${props.region}:${props.account}:knowledge-base/*/data-source/*`
173+
]
174+
})
175+
176+
this.syncKnowledgeBaseManagedPolicy = new ManagedPolicy(this, "SyncKnowledgeBaseManagedPolicy", {
177+
description: "Policy for SyncKnowledgeBase Lambda to trigger ingestion jobs",
178+
statements: [syncKnowledgeBasePolicy]
179+
})
161180
}
162181
}

packages/cdk/resources/VectorKnowledgeBaseResources.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface VectorKnowledgeBaseProps {
1717
export class VectorKnowledgeBaseResources extends Construct {
1818
public readonly knowledgeBase: CfnKnowledgeBase
1919
public readonly guardrail: CfnGuardrail
20+
public readonly dataSource: CfnDataSource
2021

2122
constructor(scope: Construct, id: string, props: VectorKnowledgeBaseProps) {
2223
super(scope, id)
@@ -81,7 +82,7 @@ export class VectorKnowledgeBaseResources extends Construct {
8182
})
8283

8384
// Create S3 data source for knowledge base documents
84-
new CfnDataSource(this, "S3DataSource", {
85+
this.dataSource = new CfnDataSource(this, "S3DataSource", {
8586
knowledgeBaseId: this.knowledgeBase.attrKnowledgeBaseId,
8687
name: `${props.stackName}-s3-datasource`,
8788
dataSourceConfiguration: {

packages/cdk/stacks/EpsAssistMeStack.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import {
44
StackProps,
55
CfnOutput
66
} from "aws-cdk-lib"
7+
import {EventType} from "aws-cdk-lib/aws-s3"
8+
import {LambdaDestination} from "aws-cdk-lib/aws-s3-notifications"
79
import {nagSuppressions} from "../nagSuppressions"
810
import {Apis} from "../resources/Apis"
911
import {Functions} from "../resources/Functions"
@@ -82,12 +84,14 @@ export class EpsAssistMeStack extends Stack {
8284
logLevel,
8385
createIndexManagedPolicy: iamResources.createIndexManagedPolicy,
8486
slackBotManagedPolicy: iamResources.slackBotManagedPolicy,
87+
syncKnowledgeBaseManagedPolicy: iamResources.syncKnowledgeBaseManagedPolicy,
8588
slackBotTokenParameter: secrets.slackBotTokenParameter,
8689
slackSigningSecretParameter: secrets.slackSigningSecretParameter,
8790
guardrailId: "", // Will be set after vector KB is created
8891
guardrailVersion: "", // Will be set after vector KB is created
8992
collectionId: openSearchResources.collection.collection.attrId,
9093
knowledgeBaseId: "", // Will be set after vector KB is created
94+
dataSourceId: "", // Will be set after vector KB is created
9195
region,
9296
account,
9397
slackBotTokenSecret: secrets.slackBotTokenSecret,
@@ -119,6 +123,22 @@ export class EpsAssistMeStack extends Stack {
119123
functions.functions.slackBot.function.addEnvironment("GUARD_RAIL_VERSION", vectorKB.guardrail.attrVersion)
120124
functions.functions.slackBot.function.addEnvironment("KNOWLEDGEBASE_ID", vectorKB.knowledgeBase.attrKnowledgeBaseId)
121125

126+
// Update SyncKnowledgeBase Lambda environment variables with vector KB info
127+
functions.functions.syncKnowledgeBase.function.addEnvironment(
128+
"KNOWLEDGEBASE_ID",
129+
vectorKB.knowledgeBase.attrKnowledgeBaseId
130+
)
131+
functions.functions.syncKnowledgeBase.function.addEnvironment(
132+
"DATA_SOURCE_ID",
133+
vectorKB.dataSource.attrDataSourceId
134+
)
135+
136+
// Add S3 event notification to trigger sync function
137+
storage.kbDocsBucket.bucket.addEventNotification(
138+
EventType.OBJECT_CREATED,
139+
new LambdaDestination(functions.functions.syncKnowledgeBase.function)
140+
)
141+
122142
// Create Apis and pass the Lambda function
123143
const apis = new Apis(this, "Apis", {
124144
stackName: props.stackName,
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import os
2+
import boto3
3+
from botocore.exceptions import ClientError
4+
from aws_lambda_powertools import Logger
5+
6+
# Configure logging
7+
logger = Logger()
8+
9+
# Initialize Bedrock client
10+
bedrock_agent = boto3.client("bedrock-agent")
11+
12+
13+
@logger.inject_lambda_context
14+
def lambda_handler(event, context):
15+
"""
16+
Lambda handler for S3 events that triggers knowledge base ingestion.
17+
"""
18+
knowledge_base_id = os.environ.get("KNOWLEDGEBASE_ID")
19+
data_source_id = os.environ.get("DATA_SOURCE_ID")
20+
21+
if not knowledge_base_id or not data_source_id:
22+
logger.error("Missing required environment variables: KNOWLEDGEBASE_ID or DATA_SOURCE_ID")
23+
return {"statusCode": 500, "body": "Configuration error"}
24+
25+
try:
26+
# Process S3 event records
27+
for record in event.get("Records", []):
28+
if record.get("eventSource") == "aws:s3":
29+
bucket = record["s3"]["bucket"]["name"]
30+
key = record["s3"]["object"]["key"]
31+
event_name = record["eventName"]
32+
33+
logger.info(f"Processing S3 event: {event_name} for {bucket}/{key}")
34+
35+
# Start ingestion job for the knowledge base
36+
response = bedrock_agent.start_ingestion_job(
37+
knowledgeBaseId=knowledge_base_id,
38+
dataSourceId=data_source_id,
39+
description=f"Auto-sync triggered by S3 event: {event_name} on {key}",
40+
)
41+
42+
job_id = response["ingestionJob"]["ingestionJobId"]
43+
logger.info(f"Started ingestion job {job_id} for knowledge base {knowledge_base_id}")
44+
45+
return {"statusCode": 200, "body": "Ingestion triggered successfully"}
46+
47+
except ClientError as e:
48+
logger.error(f"AWS service error: {e}")
49+
return {"statusCode": 500, "body": f"AWS error: {str(e)}"}
50+
except Exception as e:
51+
logger.error(f"Unexpected error: {e}")
52+
return {"statusCode": 500, "body": f"Error: {str(e)}"}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
boto3>=1.34.0
2+
aws-lambda-powertools>=2.0.0

0 commit comments

Comments
 (0)