Skip to content

Commit 3e6e283

Browse files
committed
Improve syncKnowledgeBaseFunction Lambda logging and add S3 event structure validation
1 parent 2633c41 commit 3e6e283

File tree

1 file changed

+128
-16
lines changed
  • packages/syncKnowledgeBaseFunction

1 file changed

+128
-16
lines changed
Lines changed: 128 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,164 @@
11
import os
2+
import time
23
import boto3
34
from botocore.exceptions import ClientError
45
from aws_lambda_powertools import Logger
56

6-
# Configure logging
7-
logger = Logger()
87

9-
# Initialize Bedrock client
8+
# Initialize Powertools Logger with service name for better log organization
9+
logger = Logger(service="syncKnowledgeBaseFunction")
10+
11+
# Initialize Bedrock client for knowledge base operations
1012
bedrock_agent = boto3.client("bedrock-agent")
1113

1214

1315
@logger.inject_lambda_context
1416
def handler(event, context):
1517
"""
16-
Lambda handler for S3 events that triggers knowledge base ingestion.
18+
Lambda handler that processes S3 events and triggers Bedrock Knowledge Base ingestion.
19+
20+
This function is triggered when documents are uploaded to the S3 bucket and automatically
21+
starts an ingestion job to update the knowledge base with new content.
1722
"""
23+
# Record start time for performance tracking
24+
start_time = time.time()
25+
26+
# Get required environment variables
1827
knowledge_base_id = os.environ.get("KNOWLEDGEBASE_ID")
1928
data_source_id = os.environ.get("DATA_SOURCE_ID")
2029

30+
# Validate configuration
2131
if not knowledge_base_id or not data_source_id:
22-
logger.error("Missing required environment variables: KNOWLEDGEBASE_ID or DATA_SOURCE_ID")
32+
logger.error(
33+
"Missing required environment variables",
34+
extra={"knowledge_base_id": bool(knowledge_base_id), "data_source_id": bool(data_source_id)},
35+
)
2336
return {"statusCode": 500, "body": "Configuration error"}
2437

38+
logger.info(
39+
"Starting knowledge base sync process",
40+
extra={"knowledge_base_id": knowledge_base_id, "data_source_id": data_source_id},
41+
)
42+
2543
try:
26-
# Process S3 event records
27-
for record in event.get("Records", []):
44+
processed_files = []
45+
job_ids = []
46+
47+
# Process each S3 event record
48+
for record_index, record in enumerate(event.get("Records", [])):
2849
if record.get("eventSource") == "aws:s3":
29-
bucket = record["s3"]["bucket"]["name"]
30-
key = record["s3"]["object"]["key"]
50+
# Validate S3 event structure
51+
s3_info = record.get("s3", {})
52+
bucket_name = s3_info.get("bucket", {}).get("name")
53+
object_key = s3_info.get("object", {}).get("key")
54+
55+
if not bucket_name or not object_key:
56+
logger.warning(
57+
"Skipping invalid S3 record",
58+
extra={
59+
"record_index": record_index + 1,
60+
"has_bucket": bool(bucket_name),
61+
"has_object_key": bool(object_key),
62+
},
63+
)
64+
continue
65+
66+
# Extract S3 event details
67+
bucket = bucket_name
68+
key = object_key
3169
event_name = record["eventName"]
70+
object_size = s3_info.get("object", {}).get("size", "unknown")
3271

33-
logger.info(f"Processing S3 event: {event_name} for {bucket}/{key}")
72+
logger.info(
73+
"Processing S3 event",
74+
extra={
75+
"event_name": event_name,
76+
"bucket": bucket,
77+
"key": key,
78+
"object_size_bytes": object_size,
79+
"record_index": record_index + 1,
80+
"total_records": len(event.get("Records", [])),
81+
},
82+
)
3483

3584
# Start ingestion job for the knowledge base
85+
ingestion_start_time = time.time()
3686
response = bedrock_agent.start_ingestion_job(
3787
knowledgeBaseId=knowledge_base_id,
3888
dataSourceId=data_source_id,
3989
description=f"Auto-sync triggered by S3 event: {event_name} on {key}",
4090
)
91+
ingestion_request_time = time.time() - ingestion_start_time
4192

93+
# Extract job information
4294
job_id = response["ingestionJob"]["ingestionJobId"]
43-
logger.info(f"Started ingestion job {job_id} for knowledge base {knowledge_base_id}")
95+
job_status = response["ingestionJob"]["status"]
4496

45-
return {"statusCode": 200, "body": "Ingestion triggered successfully"}
97+
logger.info(
98+
"Successfully started ingestion job",
99+
extra={
100+
"job_id": job_id,
101+
"job_status": job_status,
102+
"knowledge_base_id": knowledge_base_id,
103+
"file_key": key,
104+
"ingestion_request_duration_ms": round(ingestion_request_time * 1000, 2),
105+
},
106+
)
107+
108+
# Track processed files and job IDs for summary
109+
processed_files.append(key)
110+
job_ids.append(job_id)
111+
else:
112+
logger.warning(
113+
"Skipping non-S3 event",
114+
extra={"event_source": record.get("eventSource"), "record_index": record_index + 1},
115+
)
116+
117+
# Calculate total processing time
118+
total_duration = time.time() - start_time
119+
120+
# Log successful completion summary
121+
logger.info(
122+
"Knowledge base sync completed successfully",
123+
extra={
124+
"total_files_processed": len(processed_files),
125+
"job_ids": job_ids,
126+
"processed_files": processed_files,
127+
"total_duration_ms": round(total_duration * 1000, 2),
128+
"knowledge_base_id": knowledge_base_id,
129+
},
130+
)
131+
132+
return {
133+
"statusCode": 200,
134+
"body": f"Successfully triggered {len(job_ids)} ingestion job(s) for {len(processed_files)} file(s)",
135+
}
46136

47137
except ClientError as e:
48-
logger.error(f"AWS service error: {e}")
49-
return {"statusCode": 500, "body": f"AWS error: {str(e)}"}
138+
# Handle AWS service errors with detailed logging
139+
error_code = e.response.get("Error", {}).get("Code", "Unknown")
140+
error_message = e.response.get("Error", {}).get("Message", str(e))
141+
142+
logger.error(
143+
"AWS service error occurred",
144+
extra={
145+
"error_code": error_code,
146+
"error_message": error_message,
147+
"knowledge_base_id": knowledge_base_id,
148+
"data_source_id": data_source_id,
149+
"duration_ms": round((time.time() - start_time) * 1000, 2),
150+
},
151+
)
152+
return {"statusCode": 500, "body": f"AWS error: {error_code} - {error_message}"}
153+
50154
except Exception as e:
51-
logger.error(f"Unexpected error: {e}")
52-
return {"statusCode": 500, "body": f"Error: {str(e)}"}
155+
# Handle unexpected errors
156+
logger.error(
157+
"Unexpected error occurred",
158+
extra={
159+
"error_type": type(e).__name__,
160+
"error_message": str(e),
161+
"duration_ms": round((time.time() - start_time) * 1000, 2),
162+
},
163+
)
164+
return {"statusCode": 500, "body": f"Unexpected error: {str(e)}"}

0 commit comments

Comments
 (0)