Skip to content

Commit 0e75072

Browse files
committed
Add DynamoDB database to log all Rekongition and A2I events for offline analytics use
1 parent 7333f60 commit 0e75072

File tree

7 files changed

+225
-44
lines changed

7 files changed

+225
-44
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,16 @@ We use Parameter Store in two different ways. Firstly, we provide a set of seven
5050

5151
We use two EventBridge rules to initiate Step Functions state machine runs. The first rule is based on a Systems Manager event pattern. The Systems Manager rule is triggered by changes to the Parameter Store and initiates the state machine to invoke a Lambda function to apply changes to the impacted resources. The second rule is a schedule rule. The schedule rule is triggered periodically to initiate the state machine to invoke a Lambda function to check for new model training.
5252

53+
We use an [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) NoSQL database to log all Rekognition and A2I events and results for performance analysis and model drift detection. Although we did NOT include an analytics feature in this example, you can quickly deploy [Amazon QuickSight](https://aws.amazon.com/quicksight/) with the DynamoDB table as your source to visualize the performance of your model over time.
54+
5355
We use a Step Functions state machine to orchestrate the ML workflow. The state machine initiates different processes based on events received from EventBridge and responses from Lambda. In addition, the state machine uses an internal process such as Wait to wait for model training and deployment to complete and Choice to evaluate for next tasks.
5456

5557
We use an S3 bucket and a set of predefined folders to store training and inference images and model artifacts. Each folder has a dedicated purpose. The model operator uploads new images to the folder images_labeled_by_folder for training, and the model consumer uploads inference images to the folder images_for_detection for custom label detection.
5658

5759
We use three different sets of Lambda functions:
5860

5961
- The first set consists of two Lambda functions that build the Rekognition Custom Labels project and Amazon A2I human flow definition. These two Lambda functions are only used initially as part of the CloudFormation stack deployment process.
60-
- The second set of Lambda functions are invoked by the state machine to run Amazon Rekognition and Amazon A2I APIs, create a manifest file for training, collect labeled images for training, and manage system resources.
62+
- The second set of Lambda functions are invoked by the state machine to run Amazon Rekognition, A2I & DynamoDB APIs, create a manifest file for training, collect labeled images for training, and manage system resources.
6163
- The last set is a single Lambda function to redirect S3 PutObject events to the state machine.
6264

6365
We use [Amazon Simple Notification Service](https://aws.amazon.com/sns/) (Amazon SNS) as a communication mechanism to alert the model operator and model consumer of relevant model training and detection events. All SNS messages are published by the corresponding Lambda functions.

assets/architecture.drawio

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

assets/architecture.png

220 Bytes
Loading

lambda/a2i-human-loop.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,49 @@
44
s3 = boto3.client('s3')
55
s3_resource = boto3.resource('s3')
66
ssm = boto3.client('ssm')
7+
dynamodb = boto3.client('dynamodb')
78
def get_parameter(parameter_name):
89
response = ssm.get_parameter(Name=parameter_name)
910
return json.loads(response['Parameter']['Value'])
11+
def append_human_results(dynamodb_table, detectlabel_request_id, label, new_key, submission_time):
12+
response = dynamodb.update_item(
13+
TableName=dynamodb_table,
14+
Key={
15+
'DetectLabelRequestId': {'S':detectlabel_request_id}
16+
},
17+
ExpressionAttributeNames={
18+
'#HL':'HumanLabel',
19+
'#S3':'CopiedS3ObjectKey',
20+
'#AT': 'HumanLableCompletedDate'
21+
},
22+
ExpressionAttributeValues={
23+
':h': {'S':label},
24+
':s': {'S':new_key},
25+
':a': {'S':submission_time}
26+
},
27+
UpdateExpression='SET #HL=:h, #S3=:s, #AT=:a'
28+
)
1029
def handler(event, context):
1130
parameter_name = os.environ['parameter_store_path'] + 'For-System-Use-Only'
1231
sys_vars = get_parameter(parameter_name)
32+
dynamodb_table = sys_vars['dynamodb_table']
1333
bucket = event['s3event']['s3']['bucket']['name']
1434
key = event['s3event']['s3']['object']['key']
1535
response = s3.get_object(Bucket = bucket, Key = key)
1636
response = json.loads(response['Body'].read())
37+
new_key=''
1738
if len(response['humanAnswers']) != 0:
1839
label = response['humanAnswers'][0]['answerContent']['crowd-image-classifier']['label']
40+
submission_time = response['humanAnswers'][0]['submissionTime']
1941
if label != 'None of the Above':
2042
taskObject = response['inputContent']['taskObject']
2143
bucket, key = taskObject.replace("s3://", "").split("/", 1)
2244
copy_source = {'Bucket': bucket, 'Key': key}
2345
image_name = 'humanLoopName-' + response['humanLoopName'] + '-' + taskObject.split('/')[-1]
2446
new_key = 'images_labeled_by_folder/' + label + '/' + image_name
2547
s3_resource.meta.client.copy(copy_source, sys_vars['s3_bucket'], new_key)
48+
detectlabel_request_id=response['inputContent']['detectLabelRequestId']
49+
append_human_results(dynamodb_table, detectlabel_request_id, label, new_key, submission_time)
2650
response['s3event'] = event['s3event']
2751
return {
2852
'message': response

lambda/a2i_create_humanloop.py

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,72 @@
11
import json
22
import os
33
import boto3
4-
import uuid
5-
sns = boto3.client('sns')
6-
a2i = boto3.client('sagemaker-a2i-runtime')
74
ssm = boto3.client('ssm')
8-
def get_parameter(parameter_name):
9-
response = ssm.get_parameter(Name=parameter_name)
10-
parameter_value = json.loads(response['Parameter']['Value'])
11-
return parameter_value
5+
sns = boto3.client('sns')
6+
rekognition = boto3.client('rekognition')
7+
dynamodb = boto3.client('dynamodb')
8+
def get_parameters():
9+
response = ssm.get_parameters_by_path(
10+
Path=os.environ['parameter_store_path'],
11+
Recursive=True
12+
)
13+
parameter_store = {}
14+
for parameter in response['Parameters']:
15+
parameter_name = (parameter['Name'].split('/'))[-1]
16+
parameter_store[parameter_name] = parameter['Value']
17+
return parameter_store
18+
def store_detection_results(dynamodb_table, detectlabel_request_id, detectlabel_date, s3_bucket_name, s3_object_key, s3_object_eTag, project_version, detected_label, confidence_level, minimum_confidence_level, A2I):
19+
response = dynamodb.put_item(
20+
TableName=dynamodb_table,
21+
Item={
22+
'DetectLabelRequestId': {'S':detectlabel_request_id},
23+
'DetectLabelDate': {'S':detectlabel_date},
24+
'S3Bucket': {'S':s3_bucket_name},
25+
'S3ObjectKey': {'S':s3_object_key},
26+
'S3ObjectEtag': {'S':s3_object_eTag},
27+
'ProjectVersionArn': {'S':project_version},
28+
'DectectedLabel': {'S':detected_label},
29+
'DetectedConfidenceLevel': {'N':str(confidence_level)},
30+
'MinimumConfidenceLevel': {'N':str(minimum_confidence_level)},
31+
'A2IEnabled': {'BOOL':A2I}
32+
}
33+
)
1234
def publish_message(sns_subject, sns_message, topic_arn):
1335
response = sns.publish(
1436
TopicArn=topic_arn,
1537
Message=sns_message,
1638
Subject=sns_subject
1739
)
1840
def handler(event, context):
19-
parameter_name = os.environ['parameter_store_path'] + 'For-System-Use-Only'
20-
sys_vars = get_parameter(parameter_name)
21-
response = a2i.start_human_loop(
22-
HumanLoopName = str(uuid.uuid4()),
23-
FlowDefinitionArn = sys_vars['flow_definition_arn'],
24-
HumanLoopInput = {
25-
'InputContent': json.dumps({
26-
'initialValue': event['message']['CustomLabels'][0]['Confidence'],
27-
'taskObject': 's3://'+event['message']['s3event']['s3']['bucket']['name']+'/'+event['message']['s3event']['s3']['object']['key']
28-
})
29-
}
41+
parameter_store = get_parameters()
42+
a2i = True if parameter_store['Enable-A2I-Workflow'].lower().capitalize() == 'True' else False
43+
sys_vars = json.loads(parameter_store['For-System-Use-Only'])
44+
project_version = sys_vars['rekognition_project_version_arn']
45+
dynamodb_table = sys_vars['dynamodb_table']
46+
minimum_confidence_level=parameter_store['Minimum-Label-Detection-Confidence']
47+
s3_bucket_name=event['s3event']['s3']['bucket']['name']
48+
s3_object_key=event['s3event']['s3']['object']['key']
49+
s3_object_eTag=event['s3event']['s3']['object']['eTag']
50+
s3_event_time=event['s3event']['eventTime']
51+
response = rekognition.detect_custom_labels(
52+
ProjectVersionArn=project_version,
53+
Image={
54+
'S3Object': {
55+
'Bucket': s3_bucket_name,
56+
'Name': s3_object_key
57+
}
58+
},
59+
MaxResults=1,
60+
MinConfidence=0
3061
)
31-
response['s3event'] = event['message']['s3event']
32-
publish_message('A2I Human Loop Initiated', json.dumps(response), sys_vars['sns-topic'])
62+
confidence_level = response['CustomLabels'][0]['Confidence']
63+
detected_label = response['CustomLabels'][0]['Name']
64+
detectlabel_request_id = response['ResponseMetadata']['RequestId']
65+
if confidence_level < float(parameter_store['Minimum-Label-Detection-Confidence']):
66+
response['A2I'] = True if a2i else False
67+
store_detection_results(dynamodb_table, detectlabel_request_id, s3_event_time, s3_bucket_name, s3_object_key, s3_object_eTag, project_version, detected_label, confidence_level, minimum_confidence_level, a2i)
68+
publish_message('Rekgnition Custom Labels Detection Invoked', json.dumps(response), sys_vars['sns-topic'])
69+
response['s3event'] = event['s3event']
3370
return {
3471
'message': response
3572
}

lambda/rekognition-detect-label.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import json
22
import os
33
import boto3
4-
import uuid
54
ssm = boto3.client('ssm')
65
sns = boto3.client('sns')
76
rekognition = boto3.client('rekognition')
7+
dynamodb = boto3.client('dynamodb')
88
def get_parameters():
99
response = ssm.get_parameters_by_path(
1010
Path=os.environ['parameter_store_path'],
@@ -15,6 +15,22 @@ def get_parameters():
1515
parameter_name = (parameter['Name'].split('/'))[-1]
1616
parameter_store[parameter_name] = parameter['Value']
1717
return parameter_store
18+
def store_detection_results(dynamodb_table, detectlabel_request_id, detectlabel_date, s3_bucket_name, s3_object_key, s3_object_eTag, project_version, detected_label, confidence_level, minimum_confidence_level, A2I):
19+
response = dynamodb.put_item(
20+
TableName=dynamodb_table,
21+
Item={
22+
'DetectLabelRequestId': {'S':detectlabel_request_id},
23+
'DetectLabelDate': {'S':detectlabel_date},
24+
'S3Bucket': {'S':s3_bucket_name},
25+
'S3ObjectKey': {'S':s3_object_key},
26+
'S3ObjectEtag': {'S':s3_object_eTag},
27+
'ProjectVersionArn': {'S':project_version},
28+
'DectectedLabel': {'S':detected_label},
29+
'DetectedConfidenceLevel': {'N':str(confidence_level)},
30+
'MinimumConfidenceLevel': {'N':str(minimum_confidence_level)},
31+
'A2IEnabled': {'BOOL':A2I}
32+
}
33+
)
1834
def publish_message(sns_subject, sns_message, topic_arn):
1935
response = sns.publish(
2036
TopicArn=topic_arn,
@@ -23,25 +39,33 @@ def publish_message(sns_subject, sns_message, topic_arn):
2339
)
2440
def handler(event, context):
2541
parameter_store = get_parameters()
42+
a2i = True if parameter_store['Enable-A2I-Workflow'].lower().capitalize() == 'True' else False
2643
sys_vars = json.loads(parameter_store['For-System-Use-Only'])
44+
project_version = sys_vars['rekognition_project_version_arn']
45+
dynamodb_table = sys_vars['dynamodb_table']
46+
minimum_confidence_level=parameter_store['Minimum-Label-Detection-Confidence']
47+
s3_bucket_name=event['s3event']['s3']['bucket']['name']
48+
s3_object_key=event['s3event']['s3']['object']['key']
49+
s3_object_eTag=event['s3event']['s3']['object']['eTag']
50+
s3_event_time=event['s3event']['eventTime']
2751
response = rekognition.detect_custom_labels(
28-
ProjectVersionArn=sys_vars['rekognition_project_version_arn'],
52+
ProjectVersionArn=project_version,
2953
Image={
3054
'S3Object': {
31-
'Bucket': event['s3event']['s3']['bucket']['name'],
32-
'Name': event['s3event']['s3']['object']['key']
55+
'Bucket': s3_bucket_name,
56+
'Name': s3_object_key
3357
}
3458
},
3559
MaxResults=1,
3660
MinConfidence=0
3761
)
3862
confidence_level = response['CustomLabels'][0]['Confidence']
39-
publish_message('Rekgnition Custom Labels Detection Invoked', json.dumps(response), sys_vars['sns-topic'])
63+
detected_label = response['CustomLabels'][0]['Name']
64+
detectlabel_request_id = response['ResponseMetadata']['RequestId']
4065
if confidence_level < float(parameter_store['Minimum-Label-Detection-Confidence']):
41-
if parameter_store['Enable-A2I-Workflow'] in ['True', 'true', 'TRUE']:
42-
response['A2I'] = True
43-
else:
44-
response['A2I'] = False
66+
response['A2I'] = True if a2i else False
67+
store_detection_results(dynamodb_table, detectlabel_request_id, s3_event_time, s3_bucket_name, s3_object_key, s3_object_eTag, project_version, detected_label, confidence_level, minimum_confidence_level, a2i)
68+
publish_message('Rekgnition Custom Labels Detection Invoked', json.dumps(response), sys_vars['sns-topic'])
4569
response['s3event'] = event['s3event']
4670
return {
4771
'message': response

0 commit comments

Comments
 (0)