Skip to content

Commit 1a3668e

Browse files
authored
Merge pull request #6 from aws-iot-builder-tools/flow-refactor
Flow refactor complete
2 parents 5abe586 + 9a96bb2 commit 1a3668e

File tree

19 files changed

+786
-470
lines changed

19 files changed

+786
-470
lines changed

coverage.svg

Lines changed: 2 additions & 2 deletions
Loading

linting.svg

Lines changed: 2 additions & 2 deletions
Loading

src/bulk_importer/main.py

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from cryptography.hazmat.backends import default_backend
2020
from aws_lambda_powertools.utilities.typing import LambdaContext
2121
from aws_lambda_powertools.utilities.data_classes import SQSEvent
22-
from aws_utils import get_thing_type_arn, get_thing_group_arn, get_policy_arn
2322
from cert_utils import get_certificate_fingerprint
2423

2524
logger = logging.getLogger()
@@ -62,6 +61,8 @@ def get_thing(thing_name: str) -> str:
6261

6362
def process_policy(policy_name, certificate_id):
6463
"""Attaches the IoT policy to the certificate"""
64+
if policy_name is None:
65+
return
6566
iot_client = boto3client('iot')
6667
iot_client.attach_policy(policyName=policy_name,
6768
target=get_certificate_arn(certificate_id))
@@ -145,15 +146,13 @@ def process_certificate(config, requeue_cb):
145146
"Role. Unable to reach IoT Core object.")
146147
return None
147148

148-
def process_thing_group(thing_group_name, thing_name):
149+
def process_thing_group(thing_group_arn, thing_arn):
149150
"""Attaches the configured thing group to the iot thing"""
151+
if thing_group_arn is None:
152+
return
150153
iot_client = boto3client('iot')
151154
try:
152-
thing_group_arn = get_thing_group_arn(thing_group_name)
153-
thing_arn = get_thing(thing_name)
154-
iot_client.add_thing_to_thing_group(thingGroupName=thing_group_name,
155-
thingGroupArn=thing_group_arn,
156-
thingName=thing_name,
155+
iot_client.add_thing_to_thing_group(thingGroupArn=thing_group_arn,
157156
thingArn=thing_arn,
158157
overrideDynamicGroups=False)
159158
except ClientError as error:
@@ -178,32 +177,18 @@ def get_name_from_certificate(certificate_id):
178177

179178
def process_sqs(config):
180179
"""Main processing function to procedurally run through processing steps."""
181-
policy_name = config.get('policy_name')
182-
thing_group_name = config.get('thing_group_name')
183-
thing_type_name = config.get('thing_type_name')
184-
185180
certificate_id = process_certificate(config, requeue)
186-
187-
if certificate_id is None:
188-
thing_name = config['thing']
189-
else:
190-
thing_name = get_name_from_certificate(certificate_id)
191-
192-
process_thing(thing_name, certificate_id, thing_type_name)
193-
process_policy(policy_name, certificate_id)
194-
process_thing_group(thing_group_name, thing_name)
195-
196-
def lambda_handler(event: SQSEvent, context: LambdaContext) -> dict: # pylint: disable=unused-argument
181+
process_thing(config.get('thing'),
182+
certificate_id,
183+
config.get('thing_type_arn'))
184+
process_policy(config.get('policy_arn'),
185+
certificate_id)
186+
process_thing_group(config.get('thing_group_arn'),
187+
config.get('thing'))
188+
189+
def lambda_handler(event: SQSEvent,
190+
context: LambdaContext) -> dict: # pylint: disable=unused-argument
197191
"""Lambda function main entry point"""
198-
if event.get('Records') is None:
199-
print("ERROR: Configuration incorrect: no event record on invoke")
200-
return {
201-
"statusCode": 400,
202-
"body": json.dumps({
203-
"message": "No SQS event records available for processing."
204-
})
205-
}
206-
207192
for record in event['Records']:
208193
if record.get('eventSource') == 'aws:sqs':
209194
config = json.loads(record["body"])

src/bulk_importer/requirements.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1-
boto3
2-
cryptography
1+
botocore==1.38.15
2+
boto3==1.38.15
3+
cryptography==44.0.3
4+
aws_lambda_powertools==3.12.0

src/layer_utils/aws_utils.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,11 @@ def s3_object_bytes(bucket_name: str, object_name: str, getvalue: bool=False):
4646
# object_name=object_name)
4747
# return object_stream.getvalue()
4848

49-
def queue_manifest_certificate(identity, certificate, queue_url):
49+
def send_sqs_message(config, queue_url):
5050
"""Send the thing name and certificate to sqs queue"""
5151
sqs_client = client("sqs")
52-
payload = {
53-
'thing': identity,
54-
'certificate': certificate
55-
}
5652
sqs_client.send_message( QueueUrl=queue_url,
57-
MessageBody=dumps(payload) )
53+
MessageBody=dumps(config) )
5854

5955
def verify_queue(queue_url: str) -> bool:
6056
"""Verify the queue exists by attempting to fetch its attributes"""
@@ -72,6 +68,8 @@ def verify_queue(queue_url: str) -> bool:
7268

7369
def get_thing_type_arn(type_name: str) -> str:
7470
"""Retrieves the thing type ARN"""
71+
if type_name in ("None", ""):
72+
return None
7573
iot_client = client('iot')
7674
try:
7775
response = iot_client.describe_thing_type(thingTypeName=type_name)
@@ -83,8 +81,10 @@ def get_thing_type_arn(type_name: str) -> str:
8381
logger.error("{this} ({thing_type_name}): {error_code} : {error_mesg}")
8482
raise error
8583

86-
def get_thing_group_arn(thing_group_name):
84+
def get_thing_group_arn(thing_group_name: str) -> str:
8785
"""Retrieves the thing group ARN"""
86+
if thing_group_name in ("None", ""):
87+
return None
8888
iot_client = client('iot')
8989

9090
try:
@@ -97,8 +97,11 @@ def get_thing_group_arn(thing_group_name):
9797
logger.error("{this} ({thing_group_name}): {error_code} : {error_mesg}")
9898
raise error
9999

100-
def get_policy_arn(policy_name):
100+
def get_policy_arn(policy_name: str) -> str:
101101
"""Retrieve the IoT policy ARN"""
102+
if policy_name is None:
103+
return None
104+
102105
iot_client = client('iot')
103106
try:
104107
response = iot_client.get_policy(policyName=policy_name)

src/product_provider/main.py

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,87 @@
77
import os
88
import json
99
import boto3
10-
from aws_lambda_powertools.utilities.typing import LambdaContext
11-
from aws_lambda_powertools.utilities.data_classes import SQSEvent
12-
13-
def process(payload):
14-
"""Annotate payload with environment-passed variants, later this function
15-
will evolve to allow importing types, groups, and policies"""
10+
import logging
1611

17-
queue_url = os.environ.get('QUEUE_TARGET')
12+
from aws_lambda_powertools.utilities.typing import LambdaContext
13+
from aws_lambda_powertools.utilities.data_classes import S3Event
14+
from aws_utils import get_policy_arn, get_thing_group_arn, get_thing_type_arn
1815

19-
# Policy is required.
20-
payload['policy_name'] = os.environ.get('POLICY_NAME')
16+
logger = logging.getLogger()
17+
logger.setLevel("INFO")
2118

22-
# Thing group is desired, but optional.
23-
# The reason why 'None' has to be set is an environment variable
24-
# on a Lambda function cannot be set to empty
25-
if os.environ.get('THING_GROUP_NAME') == "None":
26-
payload['thing_group_name'] = None
27-
else:
28-
payload['thing_group_name'] = os.environ.get('THING_GROUP_NAME')
19+
ESPRESSIF_BUCKET_PREFIX = "thingpress-espressif-"
20+
INFINEON_BUCKET_PREFIX = "thingpress-infineon-"
21+
MICROCHIP_BUCKET_PREFIX = "thingpress-microchip-"
2922

30-
# Thing group is desired, but optional.
31-
if os.environ.get('THING_TYPE_NAME') == "None":
32-
payload['thing_type_name'] = None
33-
else:
34-
payload['thing_type_name'] = os.environ.get('THING_TYPE_NAME')
23+
def process(payload: hash, queue_url: str) -> hash:
24+
"""Annotate payload with environment-passed variants, later this function
25+
will evolve to allow importing types, groups, and policies"""
3526

3627
# Pass on to the queue for target processing.
3728
client = boto3.client("sqs")
29+
logger.info("Sending payload to queue {queue_url}")
3830
client.send_message( QueueUrl=queue_url,
3931
MessageBody=json.dumps(payload))
4032
return payload
4133

42-
def lambda_handler(event: SQSEvent, context: LambdaContext) -> dict: # pylint: disable=unused-argument
43-
"""Lambda function main entry point"""
34+
def get_provider_queue(bucket_name: str) -> str:
35+
"""
36+
Returns the queue related to the prefix of a given bucket
37+
The cfn stack prescribes the environment variable value.
38+
See the cfn template for more detail.
39+
"""
40+
if bucket_name.startswith(ESPRESSIF_BUCKET_PREFIX):
41+
return os.environ.get('QUEUE_TARGET_ESPRESSIF')
42+
if bucket_name.startswith(INFINEON_BUCKET_PREFIX):
43+
return os.environ.get('QUEUE_TARGET_INFINEON')
44+
if bucket_name.startswith(MICROCHIP_BUCKET_PREFIX):
45+
return os.environ.get('QUEUE_TARGET_MICROCHIP')
46+
return None
47+
48+
def lambda_handler(event: S3Event,
49+
context: LambdaContext) -> dict: # pylint: disable=unused-argument
50+
"""
51+
Lambda function main entry point. Verifies the S3 object can be read and resolves
52+
inputs prior to forwarding to vendor handler queue.
53+
54+
This lambda function expects invocation by S3 event. There should be only one
55+
event, but is processed as if multiple events were found at once.
56+
57+
Expects the following environment variables to be set:
58+
QUEUE_TARGET_ESPRESSIF
59+
QUEUE_TARGET_INFINEON
60+
QUEUE_TARGET_MICROCHIP
61+
62+
Expects at least one of the following environment variables to be set:
63+
POLICY_NAME
64+
THING_GROUP_NAME
65+
66+
May have the following environment variables set:
67+
THING_TYPE_NAME
68+
"""
4469
# Get the payload coming in and process it. There might be more than one.
45-
result = []
46-
for record in event['Records']:
47-
r = process(json.loads(record["body"]))
48-
result.append(r)
49-
return result
70+
v_thing_group = get_thing_group_arn(os.environ.get('THING_GROUP_NAME'))
71+
v_thing_type = get_thing_type_arn(os.environ.get('THING_TYPE_NAME'))
72+
v_policy = get_policy_arn(os.environ.get('POLICY_NAME'))
73+
74+
s3_event = S3Event(event)
75+
queue_url = get_provider_queue(s3_event.bucket_name)
76+
if queue_url is None:
77+
logger.error("Queue URL could not be resolved. Exiting.")
78+
return None
79+
80+
data = {
81+
'policy_arn': os.environ.get('POLICY_NAME'),
82+
'thing_group_arn': v_thing_group,
83+
'thing_type_arn': v_thing_type,
84+
'bucket': s3_event.bucket_name
85+
}
86+
87+
for record in s3_event.records:
88+
# TODO: verify s3 object, for now assume it is reachable
89+
# v_object = verify_s3_object(bucket, record.s3.get_object.key)
90+
data['key'] = record.s3.get_object.key
91+
logger.info("Processing data for object {record.s3.get_object.key}")
92+
process(data, queue_url)
93+
return event

src/provider_espressif/main.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,38 @@
88
import os
99
import io
1010
import csv
11+
import json
12+
import base64
13+
import logging
1114
from aws_lambda_powertools.utilities.typing import LambdaContext
12-
from aws_lambda_powertools.utilities.data_classes import S3Event
13-
from aws_utils import s3_object_bytes, queue_manifest_certificate
15+
from aws_lambda_powertools.utilities.data_classes import SQSEvent
16+
from aws_utils import s3_object_bytes, send_sqs_message
1417

15-
def invoke_export(bucket_name: str, object_name: str, queue_url: str):
18+
logger = logging.getLogger()
19+
logger.setLevel("INFO")
20+
21+
def invoke_export(config: hash, queue_url: str):
1622
"""Evaluate CSV based Espressif manifest"""
17-
manifest_bytes = s3_object_bytes(bucket_name, object_name, getvalue=True)
23+
manifest_bytes = s3_object_bytes(config['bucket'],
24+
config['key'],
25+
getvalue=True)
1826
reader_list = csv.DictReader(io.StringIO(manifest_bytes.decode()))
1927

2028
for row in reader_list:
21-
queue_manifest_certificate(row['MAC'], row['cert'], queue_url)
29+
config['thing'] = row['MAC']
30+
config['certificate'] = str(base64.b64encode(row['cert'].encode('ascii')))
31+
send_sqs_message(config, queue_url)
2232

2333
def lambda_handler(event: dict, context: LambdaContext) -> dict: # pylint: disable=unused-argument
2434
"""Lambda function main entry point"""
25-
s3_event = S3Event(event)
35+
sqs_event = SQSEvent(event)
2636
queue_url = os.environ['QUEUE_TARGET']
37+
if event.get('Records') is None:
38+
#TODO throw an exception here
39+
return None
40+
for record in event['Records']:
41+
if record.get('eventSource') == 'aws:sqs':
42+
config = json.loads(record["body"])
43+
invoke_export(config, queue_url)
2744

28-
bucket = s3_event.bucket_name
29-
for record in s3_event.records:
30-
manifest = record.s3.get_object.key
31-
invoke_export(bucket, manifest, queue_url)
3245
return event
File renamed without changes.

src/provider_infineon/main.py renamed to src/provider_infineon/provider_infineon/main.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
the import processing pipeline
77
"""
88
import os
9+
import json
910
import logging
1011
from aws_lambda_powertools.utilities.typing import LambdaContext
11-
from aws_lambda_powertools.utilities.data_classes import S3Event
12-
from aws_utils import s3_object_bytes, verify_queue
12+
from aws_lambda_powertools.utilities.data_classes import SQSEvent
13+
from aws_utils import verify_queue
1314
from .manifest_handler import invoke_export, verify_certtype
1415

1516
logger = logging.getLogger()
@@ -28,10 +29,14 @@ def lambda_handler(event: dict, context: LambdaContext) -> dict: # pylint: disab
2829
logger.error("Certificate type not valid. Must be E0E0, E0E1, or E0E2.")
2930
return None
3031

31-
s3_event = S3Event(event)
32-
bucket = s3_event.bucket_name
33-
for record in s3_event.records:
34-
manifest = record.s3.get_object.key
35-
manifest_content = s3_object_bytes(bucket, manifest, getvalue=True)
36-
invoke_export(manifest_content, queue_url, cert_type)
32+
sqs_event = SQSEvent(event)
33+
queue_url = os.environ['QUEUE_TARGET']
34+
if event.get('Records') is None:
35+
#TODO throw an exception here
36+
return None
37+
for record in event['Records']:
38+
if record.get('eventSource') == 'aws:sqs':
39+
config = json.loads(record["body"])
40+
invoke_export(config, queue_url, cert_type)
41+
3742
return event

0 commit comments

Comments
 (0)