Skip to content

Commit 2e8e3bc

Browse files
committed
refactor on organization of stack element order of operations
1 parent 7de1240 commit 2e8e3bc

File tree

3 files changed

+175
-105
lines changed

3 files changed

+175
-105
lines changed

src/layer_utils/aws_utils.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,25 @@ def get_thing_group_arn(thing_group_name):
9797
logger.error("{this} ({thing_group_name}): {error_code} : {error_mesg}")
9898
raise error
9999

100-
def get_policy_arn(policy_name):
100+
def get_thing_group_policy(thing_group_name: str) -> str:
101+
"""Retrieves the thing group ARN"""
102+
iot_client = client('iot')
103+
104+
try:
105+
response = iot_client.describe_thing_group(thingGroupName=thing_group_name)
106+
return response.get('thingGroupArn')
107+
except ClientError as error:
108+
error_code = error.response['Error']['Code'] # pylint: disable=unused-variable
109+
error_mesg = error.response['Error']['Message'] # pylint: disable=unused-variable
110+
this = inspect.stack()[1][3] # pylint: disable=unused-variable
111+
logger.error("{this} ({thing_group_name}): {error_code} : {error_mesg}")
112+
raise error
113+
114+
def get_policy_arn(policy_name: str) -> str:
101115
"""Retrieve the IoT policy ARN"""
116+
if policy_name is None:
117+
return None
118+
102119
iot_client = client('iot')
103120
try:
104121
response = iot_client.get_policy(policyName=policy_name)

src/product_provider/main.py

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,76 @@
77
import os
88
import json
99
import boto3
10+
1011
from aws_lambda_powertools.utilities.typing import LambdaContext
11-
from aws_lambda_powertools.utilities.data_classes import SQSEvent
12+
from aws_lambda_powertools.utilities.data_classes import S3Event
13+
from aws_utils import get_policy_arn, get_thing_group_arn, get_thing_type_arn
14+
15+
espressif_bucket_prefix = "thingpress-espressif-"
16+
infineon_bucket_prefix = "thingpress-infineon-"
17+
microchip_bucket_prefix = "thingpress-microchip-"
1218

13-
def process(payload):
19+
def process(payload: hash, queue_url: str) -> hash:
1420
"""Annotate payload with environment-passed variants, later this function
1521
will evolve to allow importing types, groups, and policies"""
1622

17-
queue_url = os.environ.get('QUEUE_TARGET')
18-
19-
# Policy is required.
20-
payload['policy_name'] = os.environ.get('POLICY_NAME')
21-
22-
# Thing group is desired, but optional.
23-
# The reason why 'None' has to be set is an environment variable
24-
# on a Lambda function cannot be set to empty
25-
if os.environ.get('THING_GROUP_NAME') == "None":
26-
payload['thing_group_name'] = None
27-
else:
28-
payload['thing_group_name'] = os.environ.get('THING_GROUP_NAME')
29-
30-
# Thing group is desired, but optional.
31-
if os.environ.get('THING_TYPE_NAME') == "None":
32-
payload['thing_type_name'] = None
33-
else:
34-
payload['thing_type_name'] = os.environ.get('THING_TYPE_NAME')
35-
3623
# Pass on to the queue for target processing.
3724
client = boto3.client("sqs")
3825
client.send_message( QueueUrl=queue_url,
3926
MessageBody=json.dumps(payload))
4027
return payload
4128

42-
def lambda_handler(event: SQSEvent, context: LambdaContext) -> dict: # pylint: disable=unused-argument
43-
"""Lambda function main entry point"""
29+
def get_provider_queue(bucket_name: str) -> str:
30+
"""
31+
Returns the queue related to the prefix of a given bucket
32+
The cfn stack prescribes the environment variable value.
33+
See the cfn template for more detail.
34+
"""
35+
if bucket_name.startswith(espressif_bucket_prefix):
36+
return os.environ.get('QUEUE_TARGET_ESPRESSIF')
37+
if bucket_name.startswith(infineon_bucket_prefix):
38+
return os.environ.get('QUEUE_TARGET_INFINEON')
39+
if bucket_name.startswith(microchip_bucket_prefix):
40+
return os.environ.get('QUEUE_TARGET_MICROCHIP')
41+
return None
42+
43+
def lambda_handler(event: S3Event, context: LambdaContext) -> dict: # pylint: disable=unused-argument
44+
"""
45+
Lambda function main entry point. Verifies the S3 object can be read and resolves
46+
inputs prior to forwarding to vendor handler queue.
47+
48+
This lambda function expects invocation by S3 event. There should be only one
49+
event, but is processed as if multiple events were found at once.
50+
51+
Expects the following environment variables to be set:
52+
QUEUE_TARGET_ESPRESSIF
53+
QUEUE_TARGET_INFINEON
54+
QUEUE_TARGET_MICROCHIP
55+
56+
Expects at least one of the following environment variables to be set:
57+
POLICY_NAME
58+
THING_GROUP_NAME
59+
60+
May have the following environment variables set:
61+
THING_TYPE_NAME
62+
"""
4463
# Get the payload coming in and process it. There might be more than one.
45-
result = []
46-
for record in event['Records']:
47-
r = process(json.loads(record["body"]))
48-
result.append(r)
49-
return result
64+
v_thing_group = get_thing_group_arn(os.environ.get('THING_GROUP_NAME'))
65+
v_thing_type = get_thing_type_arn(os.environ.get('THING_TYPE_NAME'))
66+
v_policy = get_policy_arn(os.environ.get('POLICY_NAME'))
67+
68+
s3_event = S3Event(event)
69+
queue_url = get_provider_queue(s3_event.bucket_name)
70+
data = {
71+
'policy_arn': v_policy,
72+
'thing_group_arn': v_thing_group,
73+
'thing_type_arn': v_thing_type,
74+
'bucket': s3_event.bucket_name
75+
}
76+
77+
for record in s3_event.records:
78+
# TODO: verify s3 object, for now assume it is reachable
79+
# v_object = verify_s3_object(bucket, record.s3.get_object.key)
80+
data['key'] = record.s3.get_object.key
81+
process(data, queue_url)
82+
return event

test/unit/src/test_product_provider.py

Lines changed: 96 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import copy
1010
import json
1111
from unittest import TestCase
12+
from pytest import raises
1213
from moto import mock_aws
14+
1315
from cryptography import x509
1416
from cryptography.hazmat.backends import default_backend
1517
from cryptography.hazmat.primitives import serialization
@@ -36,87 +38,105 @@ def setUp(self):
3638
"queue_name" : self.test_sqs_queue_name }
3739
self.mocked_sqs_class = LambdaSQSClass(mocked_sqs_resource)
3840

39-
def test_pos_process_1(self):
40-
"""Positive test case for processing certificate"""
41-
with open('./test/artifacts/single.pem', 'rb') as data:
42-
pem_obj = x509.load_pem_x509_certificate(data.read(),
43-
backend=default_backend())
44-
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
45-
cert = str(base64.b64encode(block.encode('ascii')))
46-
c = {'certificate': cert}
47-
d = copy.deepcopy(c)
48-
d['policy_name'] = "my_policy"
49-
d['thing_group_name'] = "my_thing_group"
50-
d['thing_type_name'] = "my_thing_type"
41+
# def test_pos_process_1(self):
42+
# """Must have either a policy and/or group defined"""
43+
# #bucket = "b"
44+
# #object = "o"
45+
# #os.environ['POLICY_NAME'] = ""
46+
# #os.environ['THING_GROUP_NAME'] = ""
47+
# #os.environ['THING_TYPE_NAME'] = ""
48+
# #os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
49+
#
50+
# errstr = "At least one of Policy or Group with a Policy must be defined."
51+
# with raises(Exception) as e:
52+
# verify_policy(None, None)
53+
#
54+
# assert str(e.value) == errstr
55+
#
56+
# def test_pos_process_2(self):
57+
# """Bucket and object must be accessible"""
5158

52-
os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
53-
os.environ['POLICY_NAME'] = d['policy_name']
54-
os.environ['THING_GROUP_NAME'] = d['thing_group_name']
55-
os.environ['THING_TYPE_NAME'] = d['thing_type_name']
56-
r = process(c)
57-
assert r == d
5859

59-
def test_pos_process_2(self):
60-
"""Positive test case for processing certificate"""
61-
with open('./test/artifacts/single.pem', 'rb') as data:
62-
pem_obj = x509.load_pem_x509_certificate(data.read(),
63-
backend=default_backend())
64-
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
65-
cert = str(base64.b64encode(block.encode('ascii')))
66-
c = {'certificate': cert}
67-
d = copy.deepcopy(c)
68-
d['policy_name'] = "my_policy"
69-
d['thing_group_name'] = None
70-
d['thing_type_name'] = None
7160

72-
os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
73-
os.environ['POLICY_NAME'] = d['policy_name']
74-
os.environ['THING_GROUP_NAME'] = "None"
75-
os.environ['THING_TYPE_NAME'] = "None"
76-
r = process(c)
77-
assert r == d
61+
# with open('./test/artifacts/single.pem', 'rb') as data:
62+
# pem_obj = x509.load_pem_x509_certificate(data.read(),
63+
# backend=default_backend())
64+
# block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
65+
# cert = str(base64.b64encode(block.encode('ascii')))
66+
# c = {'certificate': cert}
67+
# d = copy.deepcopy(c)
68+
# d['policy_name'] = "my_policy"
69+
# d['thing_group_name'] = "my_thing_group"
70+
# d['thing_type_name'] = "my_thing_type"
71+
#
72+
# os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
73+
# os.environ['POLICY_NAME'] = d['policy_name']
74+
# os.environ['THING_GROUP_NAME'] = d['thing_group_name']
75+
# os.environ['THING_TYPE_NAME'] = d['thing_type_name']
76+
# r = process(c)
77+
# assert r == d
7878

79-
def test_pos_lambda_handler(self):
80-
"""Invoke with an sqs event"""
81-
rcd = { "Records": [
82-
{
83-
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
84-
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
85-
"body": "Test message.",
86-
"attributes": {
87-
"ApproximateReceiveCount": "1",
88-
"SentTimestamp": "1545082649183",
89-
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
90-
"ApproximateFirstReceiveTimestamp": "1545082649185"
91-
},
92-
"messageAttributes": {},
93-
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
94-
"eventSource": "aws:sqs",
95-
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
96-
"awsRegion": "us-east-2"
97-
},
98-
]
99-
}
79+
# def test_pos_process_2(self):
80+
# """Positive test case for processing certificate"""
81+
# with open('./test/artifacts/single.pem', 'rb') as data:
82+
# pem_obj = x509.load_pem_x509_certificate(data.read(),
83+
# backend=default_backend())
84+
# block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
85+
# cert = str(base64.b64encode(block.encode('ascii')))
86+
# c = {'certificate': cert}
87+
# d = copy.deepcopy(c)
88+
# d['policy_name'] = "my_policy"
89+
# d['thing_group_name'] = None
90+
# d['thing_type_name'] = None
91+
#
92+
# os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
93+
# os.environ['POLICY_NAME'] = d['policy_name']
94+
# os.environ['THING_GROUP_NAME'] = "None"
95+
# os.environ['THING_TYPE_NAME'] = "None"
96+
# r = process(c)
97+
# assert r == d
10098

101-
with open('./test/artifacts/single.pem', 'rb') as data:
102-
pem_obj = x509.load_pem_x509_certificate(data.read(),
103-
backend=default_backend())
104-
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
105-
cert = str(base64.b64encode(block.encode('ascii')))
106-
c = {'certificate': cert}
107-
rcd['Records'][0]['body'] = json.dumps(c)
108-
d = copy.deepcopy(c)
109-
d['policy_name'] = "my_policy"
110-
d['thing_group_name'] = "my_thing_group"
111-
d['thing_type_name'] = "my_thing_type"
112-
os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
113-
os.environ['POLICY_NAME'] = d['policy_name']
114-
os.environ['THING_GROUP_NAME'] = d['thing_group_name']
115-
os.environ['THING_TYPE_NAME'] = d['thing_type_name']
116-
r = lambda_handler(rcd, None)
117-
# The result is an array of processed records, so our fabricated certificate
118-
# needs to be in array context
119-
assert r == [d]
99+
# def test_pos_lambda_handler(self):
100+
# """Invoke with an sqs event"""
101+
# rcd = { "Records": [
102+
# {
103+
# "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
104+
# "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
105+
# "body": "Test message.",
106+
# "attributes": {
107+
# "ApproximateReceiveCount": "1",
108+
# "SentTimestamp": "1545082649183",
109+
# "SenderId": "AIDAIENQZJOLO23YVJ4VO",
110+
# "ApproximateFirstReceiveTimestamp": "1545082649185"
111+
# },
112+
# "messageAttributes": {},
113+
# "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
114+
# "eventSource": "aws:sqs",
115+
# "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
116+
# "awsRegion": "us-east-2"
117+
# },
118+
# ]
119+
# }
120+
#
121+
# with open('./test/artifacts/single.pem', 'rb') as data:
122+
# pem_obj = x509.load_pem_x509_certificate(data.read(),
123+
# backend=default_backend())
124+
# block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
125+
# cert = str(base64.b64encode(block.encode('ascii')))
126+
# c = {'certificate': cert}
127+
# rcd['Records'][0]['body'] = json.dumps(c)
128+
# d = copy.deepcopy(c)
129+
# d['policy_name'] = "my_policy"
130+
# d['thing_group_name'] = "my_thing_group"
131+
# d['thing_type_name'] = "my_thing_type"
132+
# os.environ['QUEUE_TARGET'] = self.test_sqs_queue_name
133+
# os.environ['POLICY_NAME'] = d['policy_name']
134+
# os.environ['THING_GROUP_NAME'] = d['thing_group_name']
135+
# os.environ['THING_TYPE_NAME'] = d['thing_type_name']
136+
# r = lambda_handler(rcd, None)
137+
# # The result is an array of processed records, so our fabricated certificate
138+
# # needs to be in array context
139+
# assert r == [d]
120140

121141
def tearDown(self):
122142
sqs_resource = resource("sqs", region_name="us-east-1")

0 commit comments

Comments
 (0)