Skip to content

Commit c394a0e

Browse files
committed
more pylint changes
1 parent b761f7f commit c394a0e

File tree

4 files changed

+63
-50
lines changed

4 files changed

+63
-50
lines changed

src/bulk_importer/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
Lambda function to import certificate, construct IoT Thing, and associate
66
the Thing, Policy, Certificate, Thing Type, and Thing Group
77
"""
8-
8+
import ast
99
import base64
1010
import json
1111
import binascii
@@ -163,7 +163,7 @@ def process_certificate(config, requeue_cb):
163163
TODO: This should be simplified"""
164164
iot_client = boto3client('iot')
165165
payload = config['certificate']
166-
certificate_text = base64.b64decode(eval(payload))
166+
certificate_text = base64.b64decode(ast.literal_eval(payload))
167167

168168
# See if the certificate has already been registered. If so, bail.
169169
certificate_obj = x509.load_pem_x509_certificate(data=certificate_text,

src/provider_espressif/main.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1+
"""
2+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
# SPDX-License-Identifier: MIT-0
4+
5+
Lambda function to decompose Espressif based certificate manifest(s) and begin
6+
the import processing pipeline
7+
"""
18
import os
29
import io
310
import json
411
import csv
512
from base64 import b64encode
613
import botocore
7-
from boto3 import resource, client, s3
8-
from moto import mock_aws, settings
9-
from aws_lambda_powertools.utilities.validation import validate
14+
from boto3 import resource, client
1015
from cryptography import x509
1116
from cryptography.hazmat.backends import default_backend
1217
from cryptography.hazmat.primitives import serialization
@@ -62,6 +67,6 @@ def lambda_handler(event, context):
6267
"""Lambda function main entry point"""
6368
queue_url = os.environ['QUEUE_TARGET']
6469
bucket_name = event['Records'][0]['s3']['bucket']['name']
65-
manifest_name = event['Records'][0]['s3']['object']['key']
70+
manifest_name = event['Records'][0]['s3']['object']['key']
6671

6772
invoke_export(bucket_name, manifest_name, queue_url)

src/provider_infineon/main.py

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,82 @@
1+
"""
2+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
# SPDX-License-Identifier: MIT-0
4+
5+
Lambda function to decompose Infineon based certificate manifest(s) and begin
6+
the import processing pipeline
7+
"""
18
import os
29
import io
310
import json
11+
from xml.etree import ElementTree
12+
from base64 import b64encode
413
from botocore import exceptions as botoexceptions
514
from boto3 import resource as boto3resource, client as boto3client
6-
import binascii
7-
from xml.etree import ElementTree
815
from cryptography import x509
916
from cryptography.hazmat.backends import default_backend
1017
from cryptography.hazmat.primitives import serialization
11-
from base64 import b64encode
1218

13-
# Given a bucket and object, verify its existence and return the resource.
1419
def s3_object_stream(bucket_name: str, object_name: str):
20+
"""Given a bucket and object, verify its existence and return the resource."""
1521
s3 = boto3resource('s3')
1622
res = s3.Object(bucket_name=bucket_name, key=object_name)
17-
try:
23+
try:
1824
fs = io.BytesIO()
1925
res.download_fileobj(fs)
2026
return fs
2127
except botoexceptions.ClientError as ce:
2228
raise ce
2329

24-
# Given a bucket name and object name, return bytes representing
25-
# the object content.
30+
2631
def s3_filebuf_bytes(bucket_name: str, object_name: str):
32+
""" Given a bucket name and object name, return bytes representing
33+
the object content."""
2734
object_stream = s3_object_stream(bucket_name=bucket_name,
2835
object_name=object_name)
2936
return object_stream.getvalue()
3037

31-
def format_certificate(certString):
32-
encodedCert = certString.encode('ascii')
38+
def format_certificate(cert_string):
39+
"""Encode certificate so that it can safely travel via sqs"""
40+
cert_encoded = cert_string.encode('ascii')
3341

34-
pem_obj = x509.load_pem_x509_certificate(encodedCert,
42+
pem_obj = x509.load_pem_x509_certificate(cert_encoded,
3543
backend=default_backend())
3644
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
37-
return {'certificate': str(b64encode(block.encode('ascii')))}
45+
return str(b64encode(block.encode('ascii')))
3846

39-
47+
def queue_certificate(identity, certificate, queue_url):
48+
"""Send the thing name and certificate to sqs queue"""
49+
sqs_client = boto3client("sqs")
50+
payload = {
51+
'thing': identity,
52+
'certificate': certificate
53+
}
54+
sqs_client.send_message( QueueUrl=queue_url,
55+
MessageBody=json.dumps(payload) )
4056

41-
def invoke_export(manifest, queueUrl):
42-
client = boto3client("sqs")
43-
57+
def invoke_export(manifest, queue_url):
58+
"""Function to Iterate through the certificate list and queue for processing"""
4459
root = ElementTree.fromstring(manifest)
4560

4661
for group in root.findall('group'): # /binaryhex
4762
thing_name = ''
4863

64+
# TODO: Evaluate what happens when this fails
4965
for hex_element in group.findall('hex'):
5066
if hex_element.get('name') == 'TpmMAC':
5167
thing_name = hex_element.get('value')
5268

53-
# There can be more than one certificate
5469
for hexdata_element in group.findall('binaryhex'):
5570
certificate_data = format_certificate(hexdata_element.text)
56-
# Need to send each certificate separately
57-
certificate_data['thing'] = thing_name
58-
print(certificate_data)
59-
client.send_message( QueueUrl=queueUrl,
60-
MessageBody=json.dumps(certificate_data) )
61-
71+
queue_certificate(thing_name, certificate_data, queue_url)
72+
6273
def lambda_handler(event, context):
63-
queueUrl = os.environ['QUEUE_TARGET']
74+
"""Lambda function main entry point"""
75+
queue_url = os.environ['QUEUE_TARGET']
6476

6577
bucket = event['Records'][0]['s3']['bucket']['name']
66-
manifest = event['Records'][0]['s3']['object']['key']
78+
manifest = event['Records'][0]['s3']['object']['key']
6779

68-
manifestContent = s3_filebuf_bytes(bucket, manifest)
80+
manifest_content = s3_filebuf_bytes(bucket, manifest)
6981

70-
invoke_export(manifestContent, queueUrl)
82+
invoke_export(manifest_content, queue_url)

test/unit/src/test_bulk_importer.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,23 @@
44
55
Unit tests for bulk_importer
66
"""
7-
import sys
87
import os
9-
#import io
8+
import base64
109
from unittest import TestCase
11-
#from unittest.mock import MagicMock, patch
12-
import pytest
13-
14-
import botocore
10+
from moto import mock_aws
11+
from cryptography import x509
12+
from cryptography.hazmat.backends import default_backend
13+
from cryptography.hazmat.primitives import serialization
1514

1615
from boto3 import resource, client
17-
from moto import mock_aws, settings
18-
from moto.settings import iot_use_valid_cert
16+
from src.bulk_importer.testable import LambdaSQSClass
17+
from src.bulk_importer.main import get_certificate_fingerprint, requeue, process_certificate
18+
# from src.bulk_importer.main import lambda_handler, get_certificate, get_thing, get_policy
19+
# from src.bulk_importer.main import get_certificate_arn, get_thing_group, get_thing_type
20+
# from src.bulk_importer.main import process_policy, process_thing
21+
# from src.bulk_importer.main import process_thing_group, get_name_from_certificate, process_sqs
1922

20-
from aws_lambda_powertools.utilities.validation import validate
21-
22-
sys.path.append('./src/bulk_importer')
2323
os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
24-
from src.bulk_importer.testable import LambdaSQSClass # pylint: disable=wrong-import-position
25-
from src.bulk_importer.main import lambda_handler, get_certificate, get_certificate_fingerprint, get_certificate_arn, get_thing, get_policy, get_thing_group, get_thing_type, process_policy, process_thing, requeue, process_certificate, process_thing_group, get_name_from_certificate, process_sqs # pylint: disable=wrong-import-position
26-
from cryptography import x509
27-
from cryptography.hazmat.backends import default_backend
28-
from cryptography.hazmat.primitives import serialization
29-
import base64
3024

3125
@mock_aws(config={
3226
"core": {
@@ -36,6 +30,7 @@
3630
},
3731
'iot': {'use_valid_cert': True}})
3832
class TestBulkImporter(TestCase):
33+
"""Test cases for bulk importer lambda function"""
3934
def setUp(self):
4035
self.test_sqs_queue_name = "provider"
4136
sqs_client = client('sqs', region_name="us-east-1")
@@ -46,14 +41,15 @@ def setUp(self):
4641
self.mocked_sqs_class = LambdaSQSClass(mocked_sqs_resource)
4742

4843
def test_pos_process_certificate(self):
44+
"""Positive test case for processing certificate"""
4945
with open('./test/artifacts/single.pem', 'rb') as data:
5046
pem_obj = x509.load_pem_x509_certificate(data.read(),
5147
backend=default_backend())
5248
block = pem_obj.public_bytes(encoding=serialization.Encoding.PEM).decode('ascii')
5349
cert = str(base64.b64encode(block.encode('ascii')))
5450
c = {'certificate': cert}
5551
r = process_certificate(c, requeue)
56-
assert (r == get_certificate_fingerprint(pem_obj))
52+
assert r == get_certificate_fingerprint(pem_obj)
5753

5854
def tearDown(self):
5955
sqs_resource = resource("sqs", region_name="us-east-1")

0 commit comments

Comments
 (0)