From 37e86ea0df09d664a45cbb20e99bc1dab7a84f90 Mon Sep 17 00:00:00 2001 From: Mit Suthar Date: Fri, 7 Feb 2025 10:46:31 -0800 Subject: [PATCH 1/4] Add integration test --- test_integ/__init__.py | 4 + test_integ/fixtures/__init__.py | 0 test_integ/fixtures/objects.py | 44 +++ test_integ/fixtures/session.py | 19 + test_integ/fixtures/sns.py | 61 ++++ test_integ/test_session.py | 604 ++++++++++++++++++++++++++++++++ 6 files changed, 732 insertions(+) create mode 100644 test_integ/__init__.py create mode 100644 test_integ/fixtures/__init__.py create mode 100644 test_integ/fixtures/objects.py create mode 100644 test_integ/fixtures/session.py create mode 100644 test_integ/fixtures/sns.py create mode 100644 test_integ/test_session.py diff --git a/test_integ/__init__.py b/test_integ/__init__.py new file mode 100644 index 0000000..a0bed63 --- /dev/null +++ b/test_integ/__init__.py @@ -0,0 +1,4 @@ +import logging + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("Integration Test Logger") diff --git a/test_integ/fixtures/__init__.py b/test_integ/fixtures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_integ/fixtures/objects.py b/test_integ/fixtures/objects.py new file mode 100644 index 0000000..d6956eb --- /dev/null +++ b/test_integ/fixtures/objects.py @@ -0,0 +1,44 @@ +import boto3 +import pytest +import uuid + +@pytest.fixture +def default_message_size_threshold(): + return 262144 + +@pytest.fixture +def small_message_body(): + return "small message body" + + +@pytest.fixture +def small_message_attribute(small_message_body): + return { + 'Small_Message_Attribute': { + 'StringValue': small_message_body, + 'DataType': 'String' + } + } + +@pytest.fixture +def custom_s3_key_attribute(): + return { + 'S3Key': { + 'StringValue': str(uuid.uuid4()), + 'DataType': 'String' + } + } + + +@pytest.fixture +def large_message_body(small_message_body, default_message_size_threshold): + return "x" * ( default_message_size_threshold + 1 ) + +@pytest.fixture +def large_message_attribute(large_message_body): + return { + 'Large_Message_Attribute': { + 'StringValue': 'Test', + 'DataType': 'String' + } + } \ No newline at end of file diff --git a/test_integ/fixtures/session.py b/test_integ/fixtures/session.py new file mode 100644 index 0000000..cf666c8 --- /dev/null +++ b/test_integ/fixtures/session.py @@ -0,0 +1,19 @@ +import boto3 +import pytest +from sns_extended_client.session import SNSExtendedClientSession + +@pytest.fixture() +def region_name() -> str: + region_name = 'us-east-1' + return region_name + +@pytest.fixture() +def session(region_name) -> boto3.Session: + + setattr(boto3.session, "Session", SNSExtendedClientSession) + # Now take care of the reference in the boto3.__init__ module since the object is being imported there too + setattr(boto3, "Session", SNSExtendedClientSession) + + # return boto3.session.Session() + print("This session is fetched") + return boto3.Session(region_name=region_name) \ No newline at end of file diff --git a/test_integ/fixtures/sns.py b/test_integ/fixtures/sns.py new file mode 100644 index 0000000..f1836c6 --- /dev/null +++ b/test_integ/fixtures/sns.py @@ -0,0 +1,61 @@ +import boto3 +from sns_extended_client import SNSExtendedClientSession +import pytest +import random + +@pytest.fixture() +def sns_extended_client(session): + sns_client = session.client("sns",region_name='us-east-1') + sns_client.large_payload_support = f'integration-test-bucket-{random.randint(0, 10000)}' + return sns_client + +@pytest.fixture() +def sqs_client(session): + return session.client("sqs") + +@pytest.fixture() +def queue_name(): + return f"IntegrationTestQueue{random.randint(0,10000)}" + +@pytest.fixture() +def topic_name(): + return f"IntegrationTestTopic{random.randint(0,10000)}" + +@pytest.fixture() +def queue(sqs_client, queue_name): + queue_object = sqs_client.create_queue(QueueName=queue_name) + + yield queue_object + + sqs_client.purge_queue( + QueueUrl=queue_object['QueueUrl'] + ) + + sqs_client.delete_queue( + QueueUrl=queue_object['QueueUrl'] + ) + +@pytest.fixture() +def topic(sns_extended_client, topic_name): + topic_arn = sns_extended_client.create_topic(Name=topic_name).get("TopicArn") + + yield topic_arn + + sns_extended_client.delete_topic( + TopicArn=topic_arn + ) + +@pytest.fixture() +def sns_extended_client_with_s3(sns_extended_client): + + client_sns = sns_extended_client + + client_sns.s3_client.create_bucket( + Bucket=client_sns.large_payload_support + ) + + yield client_sns + + client_sns.s3_client.delete_bucket( + Bucket=client_sns.large_payload_support, + ) diff --git a/test_integ/test_session.py b/test_integ/test_session.py new file mode 100644 index 0000000..d45a679 --- /dev/null +++ b/test_integ/test_session.py @@ -0,0 +1,604 @@ +from sns_extended_client import SNSExtendedClientSession +from botocore.exceptions import ClientError +from .fixtures.session import * +from .fixtures.sns import * +from .fixtures.objects import * +from . import logger +from json import loads +import copy +import logging + + + +def initialize_extended_client_attributes_through_s3(sns_extended_client): + """ + + Acts as a helper for adding attributes to the extended client + which are required for sending in payloads to S3 buckets. + + sns_extended_client: The SNS Extended Client + + """ + + sns_extended_client.always_through_s3 = True + + return + +def create_allow_sns_to_write_to_sqs_policy_json(topicarn, queuearn): + """ + Creates a policy document which allows SNS to write to SQS + + topicarn: The ARN of the SNS topic + queuearn: The ARN of the SQS queue + + """ + + logger.info("Creating policy document to allow SNS to write to SQS") + policy_document = """{{ + "Version":"2012-10-17", + "Statement":[ + {{ + "Sid":"MyPolicy", + "Effect":"Allow", + "Principal" : {{"AWS" : "*"}}, + "Action":"SQS:SendMessage", + "Resource": "{}", + "Condition":{{ + "ArnEquals":{{ + "aws:SourceArn": "{}" + }} + }} + }} + ] + }}""".format(queuearn, topicarn) + + return policy_document + +def publish_message_helper(sns_extended_client, topic_arn, message_body, message_attributes = None,message_group_id = None, message_deduplication_id = None, **kwargs): + """ + + Acts as a helper for sending a message via the SNS Extended Client. + + sns_extended_client: The SNS Extended Client + topic_arn: The ARN associated with the SNS Topic + message_body: The message body + message_attributes: The message attributes + + """ + + send_message_kwargs = { + 'TopicArn': topic_arn, + 'Message': message_body + } + + if message_attributes: + send_message_kwargs['MessageAttributes'] = message_attributes + + if message_group_id: + send_message_kwargs['MessageGroupId'] = message_group_id + send_message_kwargs['MessageDeduplicationId'] = message_deduplication_id + + + logger.info("Sending the message via the SNS Extended Client") + + response = sns_extended_client.publish(**send_message_kwargs) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + return + +def is_s3_bucket_empty(sns_extended_client): + """ + + Responsible for checking if the S3 bucket created consists + of objects at the time of calling the function. + + sns_extended_client: The SNS Extended Client + + """ + response = sns_extended_client.s3_client.list_objects_v2( + Bucket=sns_extended_client.large_payload_support + ) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + return "Contents" not in response + +def retrive_message_from_s3(sns_extended_client,s3Key): + """ + + Responsible for retrieving a message from the S3 bucket. + + sns_extended_client: The SNS Extended Client + s3Key: The S3 Key + + """ + logger.info("Retrieving the message from the S3 bucket") + + target_s3_msg_obj = sns_extended_client.s3_client.get_object(Bucket=sns_extended_client.large_payload_support, Key=s3Key) + + return target_s3_msg_obj['Body'].read().decode() + +def s3_bucket_exist(sns_extended_client): + """ + + Responsible for checking if the S3 bucket created exists + at the time of calling the function. + + sns_extended_client: The SNS Extended Client + + """ + logger.info("Checking if the S3 bucket exists") + + try: + sns_extended_client.s3_client.head_bucket(Bucket=sns_extended_client.large_payload_support) + return True + except ClientError as e: + if e.response['Error']['Code'] == '404': + return False + raise + +def receive_message_helper(sqs_client, queue_url): + """ + + Acts as a helper for receiving a message via the SQS Client. + + sqs_client: The SQS Client + queue_url: The URL associated with the SQS Queue + + """ + + logger.info("Receiving the message via the SQS Client") + + response = sqs_client.receive_message( + QueueUrl=queue_url, + MaxNumberOfMessages=1, + WaitTimeSeconds=5 + ) + + assert 'Messages' in response.keys() + + return response + +def extract_message_body_from_response(sns_extended_client,receive_message_response): + """ + + Responsible for extracting the message body from the response received via the SQS Client. + + receive_message_response: The response received from the SQS Client + + """ + + receive_message_response = loads(receive_message_response) + target_s3_msg_obj = sns_extended_client.s3_client.get_object(Bucket=receive_message_response[1].get("s3BucketName"), Key=receive_message_response[1].get("s3Key")) + + return target_s3_msg_obj['Body'].read().decode() + + + + +def check_receive_message_response(sns_extended_client,receive_message_response, message_body, message_attributes): + """ + + Responsible for checking the message received via the SQS Client. + + receive_message_response: The response received from the SQS Client + message_body: The message body + message_attributes: The message attributes + + """ + response_msg_body = extract_message_body_from_response(sns_extended_client,receive_message_response['Messages'][0]['Body']) + assert response_msg_body == message_body + + # if message_attributes: + # assert receive_message_response['Messages'][0]['MessageAttributes'] == message_attributes + +def delete_message_helper(sqs_client,queue_url, receipet_handle): + """ + + Acts as a helper for deleting a message via the SQS Client. + + sns_extended_client: The SNS Extended Client + queue_url: The URL associated with the SQS Queue + receipet_handle: The receipt handle associated with the message + + """ + + logger.info("Deleting the message via the SQS Client") + + response = sqs_client.delete_message( + QueueUrl=queue_url, + ReceiptHandle=receipet_handle + ) + + print("Response of delete msg : ") + print(response) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + +def delete_object_from_s3_helper(sns_extended_client, s3Key): + """ + + Acts as a helper for deleting an object from the S3 bucket. + + sns_extended_client: The SNS Extended Client + receive_message_response: The receive message from SQS client + + """ + + logger.info("Deleting the object from the S3 bucket") + + + + response = sns_extended_client.s3_client.delete_object( + Bucket=sns_extended_client.large_payload_support, + Key=s3Key + ) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 204 + +# def test_send_receive_delete_workflow(sns_extended_client,sqs_client, queue,topic, message_body, +# message_attributes = None): +# """ + +# """ + + +def perform_send_receive_delete_workflow(sns_extended_client,sqs_client, queue,topic, message_body, + message_attributes = None): + """ + Responsible for replicating a workflow of sending, receiving and deleting a message + by calling the helper functions as and when necessary. + + sqs_extended_client: The SQS Extended Client + queue: The SQS Queue + message_body: The message body + message_attributes: The message attributes + message_group_id: Tag specifying a message belongs to a message group (Required for FIFO Queues) + message_deduplication_id: Token used for deduplication of sent message (Required for FIFO Queues) + + """ + + queue_url = queue["QueueUrl"] + sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") + + kwargs = { + 'sns_extended_client': sns_extended_client, + 'topic_arn': topic, + 'queue_url': queue_url, + 'message_body': message_body, + } + + if message_attributes: + kwargs['message_attributes'] = message_attributes + + # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) + response = sqs_client.set_queue_attributes( + QueueUrl = queue_url, + Attributes = { + 'Policy' : policy_json + } + ) + + # Subscribe SQS queue to SNS topic + sns_extended_client.subscribe(TopicArn=topic,Protocol="sqs",Endpoint=sqs_queue_arn,Attributes={"RawMessageDelivery":"true"}) + + publish_message_helper(**kwargs) + + # Verifying that the S3 object was added + if sns_extended_client.large_payload_support: + assert not is_s3_bucket_empty(sns_extended_client) + + # Verifying whether specific S3 object with key is created while passing in message_attributes + if message_attributes and sns_extended_client.always_through_s3: + assert sns_extended_client.s3_client.get_object( + Bucket=sns_extended_client.large_payload_support, + Key=message_attributes['S3Key']['StringValue'] + ) + + receive_message_response = receive_message_helper(sqs_client,queue_url) + + receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] + # new_receipt_handle = old_receipt_handle + + check_receive_message_response(sns_extended_client,receive_message_response, message_body, message_attributes) + + # Change the message visibility here when the parameter (change_message_visibility) is true + # if change_message_visibility: + # new_receipt_handle = change_message_visibility_helper(receive_message_response=receive_message_response, + # visibility_timeout=new_message_visibility, **kwargs) + + + delete_message_helper(sqs_client, queue_url, receipet_handle) + + # Deleting object from s3 bucket + if sns_extended_client.large_payload_support: + for receive_message in receive_message_response['Messages']: + delete_object_from_s3_helper(sns_extended_client, receive_message) + + + + return + + +def test_send_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_client,queue,topic,small_message_body): + """ + Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. + sns_extended_client send a message to that topic with the attribute 'always_through_s3' set to true + which will store in S3 and reference of that object is received by SQS queue by calling the helper functions. + + sns_extedned_client_with_s3 : The SNS Extended Client with S3 Bucket + sqs_client: The SQS Client + queue: The SQS Queue + topic: The SNS Topic + small_message_body: The Message + """ + + logger.info("Initializing execution of test_send_receive_small_msg_through_s3") + + initialize_extended_client_attributes_through_s3(sns_extended_client_with_s3) + + # perform_send_receive_delete_workflow(sns_extended_client_with_s3,sqs_client, queue,topic, small_message_body) + + queue_url = queue["QueueUrl"] + sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") + + kwargs = { + 'sns_extended_client': sns_extended_client_with_s3, + 'topic_arn': topic, + 'queue_url': queue_url, + 'message_body': small_message_body, + } + + # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) + response = sqs_client.set_queue_attributes( + QueueUrl = queue_url, + Attributes = { + 'Policy' : policy_json + } + ) + + # Subscribe SQS queue to SNS topic + sns_extended_client_with_s3.subscribe(TopicArn=topic,Protocol="sqs",Endpoint=sqs_queue_arn,Attributes={"RawMessageDelivery":"true"}) + + publish_message_helper(**kwargs) + + # Message should store into S3 bucket after publishing to the SNS topic + assert not is_s3_bucket_empty(sns_extended_client_with_s3) + + receive_message_response = receive_message_helper(sqs_client,queue_url) + + # Check the message format - The stored Message in S3 has {"s3BucketName": "", "s3Key": "Key Value"} + json_receive_message_body = loads(receive_message_response['Messages'][0]['Body'])[1] + message_stored_in_s3_attributes = ['s3BucketName','s3Key'] + for key in json_receive_message_body.keys(): + assert key in message_stored_in_s3_attributes + + # Retrieve the message from s3 Bucket and check value + assert retrive_message_from_s3(sns_extended_client_with_s3,json_receive_message_body['s3Key']) == small_message_body + + + # Delete message from SQS queue + receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] + delete_message_helper(sqs_client, queue_url, receipet_handle) + + # Delete message from S3 bucket + delete_object_from_s3_helper(sns_extended_client_with_s3, json_receive_message_body['s3Key']) + + # The S3 bucket should be empty + assert is_s3_bucket_empty(sns_extended_client_with_s3) + logger.info("Completed execution of test_send_receive_small_msg_through_s3") + + return + + + +def test_send_receive_small_msg_not_through_s3(sns_extended_client, sqs_client, queue, topic, small_message_body): + """ + Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. + sns_extended_client send a message to that topic with the attribute 'always_through_s3' set to false + which will store in S3 and reference of that object is received by SQS queue by calling the helper functions. + + sns_extended_client: The SNS Extended Client + sqs_client: The SQS Client + queue: The SQS Queue + topic: The SNS Topic + small_message_body: The Message + """ + + logger.info("Initializing execution of test_send_receive_small_msg_not_through_s3") + + queue_url = queue["QueueUrl"] + sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") + + kwargs = { + 'sns_extended_client': sns_extended_client, + 'topic_arn': topic, + 'queue_url': queue_url, + 'message_body': small_message_body, + } + + # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) + response = sqs_client.set_queue_attributes( + QueueUrl = queue_url, + Attributes = { + 'Policy' : policy_json + } + ) + + # Subscribe SQS queue to SNS topic + sns_extended_client.subscribe(TopicArn=topic,Protocol="sqs",Endpoint=sqs_queue_arn,Attributes={"RawMessageDelivery":"true"}) + + publish_message_helper(**kwargs) + + receive_message_response = receive_message_helper(sqs_client,queue_url) + + # The body of response should have same message body that was being sent to topic by SNS + assert receive_message_response['Messages'][0]['Body'] == small_message_body + + # print(receive_message_response) + + # Delete message from SQS queue + receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] + delete_message_helper(sqs_client, queue_url, receipet_handle) + + logger.info("Completed execution of test_send_receive_small_msg_not_through_s3") + + return + +def test_send_receive_large_msg_which_passes_threshold_through_s3(sns_extended_client_with_s3,sqs_client,queue,topic,large_message_body): + """ + Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. + sns_extended_client send a message to that topic which exceeds the default threshold + which will store in S3 and reference of that object is received by SQS queue by calling the helper functions. + + sns_extedned_client_with_s3 : The SNS Extended Client with S3 Bucket + sqs_client: The SQS Client + queue: The SQS Queue + topic: The SNS Topic + large_message_body: The Message + """ + + logger.info("Initializing execution of test_send_receive_large_msg_which_passes_threshold_through_s3") + + queue_url = queue["QueueUrl"] + sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") + + kwargs = { + 'sns_extended_client': sns_extended_client_with_s3, + 'topic_arn': topic, + 'queue_url': queue_url, + 'message_body': large_message_body, + } + + # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) + response = sqs_client.set_queue_attributes( + QueueUrl = queue_url, + Attributes = { + 'Policy' : policy_json + } + ) + + # Subscribe SQS queue to SNS topic + sns_extended_client_with_s3.subscribe(TopicArn=topic,Protocol="sqs",Endpoint=sqs_queue_arn,Attributes={"RawMessageDelivery":"true"}) + + publish_message_helper(**kwargs) + + # Message should store into S3 bucket after publishing to the SNS topic + assert not is_s3_bucket_empty(sns_extended_client_with_s3) + + receive_message_response = receive_message_helper(sqs_client,queue_url) + + # Check the message format - The stored Message in S3 has {"s3BucketName": "", "s3Key": "Key Value"} + json_receive_message_body = loads(receive_message_response['Messages'][0]['Body'])[1] + message_stored_in_s3_attributes = ['s3BucketName','s3Key'] + for key in json_receive_message_body.keys(): + assert key in message_stored_in_s3_attributes + + # Retrieve the message from s3 Bucket and check value + assert retrive_message_from_s3(sns_extended_client_with_s3,json_receive_message_body['s3Key']) == large_message_body + + + # Delete message from SQS queue + receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] + delete_message_helper(sqs_client, queue_url, receipet_handle) + + # Delete message from S3 bucket + delete_object_from_s3_helper(sns_extended_client_with_s3, json_receive_message_body['s3Key']) + + # The S3 bucket should be empty + assert is_s3_bucket_empty(sns_extended_client_with_s3) + logger.info("Completed execution of test_send_receive_large_msg_which_passes_threshold_through_s3") + + return + +def test_send_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_client, queue, topic, small_message_body,custom_s3_key_attribute): + """ + Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. + sns_extended_client send a message to that topic with the custom attribute to store message in s3 + and reference of that object is received by SQS queue by calling the helper functions. + + sns_extended_client_with_s3: The SNS Extended Client with Existed S3 bucket + sqs_client: The SQS Client + queue: The SQS Queue + topic: The SNS Topic + small_message_body: The Message + custom_s3_key_attribute: Attribute to set custom Key of message + """ + + logger.info("Initializing execution of test_send_receive_small_msg_through_s3") + + initialize_extended_client_attributes_through_s3(sns_extended_client_with_s3) + + # perform_send_receive_delete_workflow(sns_extended_client_with_s3,sqs_client, queue,topic, small_message_body) + + queue_url = queue["QueueUrl"] + sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") + + kwargs = { + 'sns_extended_client': sns_extended_client_with_s3, + 'topic_arn': topic, + 'queue_url': queue_url, + 'message_body': small_message_body, + 'message_attributes': custom_s3_key_attribute + } + + # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) + response = sqs_client.set_queue_attributes( + QueueUrl = queue_url, + Attributes = { + 'Policy' : policy_json + } + ) + + # Subscribe SQS queue to SNS topic + sns_extended_client_with_s3.subscribe(TopicArn=topic,Protocol="sqs",Endpoint=sqs_queue_arn,Attributes={"RawMessageDelivery":"true"}) + + publish_message_helper(**kwargs) + + # Message should store into S3 bucket after publishing to the SNS topic + assert not is_s3_bucket_empty(sns_extended_client_with_s3) + + receive_message_response = receive_message_helper(sqs_client,queue_url) + + # Check the message format - The stored Message in S3 has {"s3BucketName": "", "s3Key": "Key Value"} + json_receive_message_body = loads(receive_message_response['Messages'][0]['Body'])[1] + message_stored_in_s3_attributes = ['s3BucketName','s3Key'] + for key in json_receive_message_body.keys(): + assert key in message_stored_in_s3_attributes + + + # Stored message key should be same as custom_s3_key_attribute datavalue + assert custom_s3_key_attribute['S3Key']['StringValue'] == json_receive_message_body['s3Key'] + + # Retrieve the message from s3 Bucket and check value + assert retrive_message_from_s3(sns_extended_client_with_s3,json_receive_message_body['s3Key']) == small_message_body + + + # Delete message from SQS queue + receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] + delete_message_helper(sqs_client, queue_url, receipet_handle) + + # Delete message from S3 bucket + delete_object_from_s3_helper(sns_extended_client_with_s3, json_receive_message_body['s3Key']) + + # The S3 bucket should be empty + assert is_s3_bucket_empty(sns_extended_client_with_s3) + logger.info("Completed execution of test_send_receive_small_msg_through_s3") + + return + + + + + + + +def test_session(session): + assert boto3.session.Session == SNSExtendedClientSession + + From df61e1f46572ab200724fe14fda06d5cfe64f054 Mon Sep 17 00:00:00 2001 From: Mit Suthar Date: Fri, 14 Feb 2025 14:05:17 -0800 Subject: [PATCH 2/4] Clean up the test_session for integration tests --- test_integ/test_session.py | 168 ++++++------------------------------- 1 file changed, 27 insertions(+), 141 deletions(-) diff --git a/test_integ/test_session.py b/test_integ/test_session.py index d45a679..25fd8cc 100644 --- a/test_integ/test_session.py +++ b/test_integ/test_session.py @@ -9,7 +9,6 @@ import logging - def initialize_extended_client_attributes_through_s3(sns_extended_client): """ @@ -57,7 +56,7 @@ def create_allow_sns_to_write_to_sqs_policy_json(topicarn, queuearn): def publish_message_helper(sns_extended_client, topic_arn, message_body, message_attributes = None,message_group_id = None, message_deduplication_id = None, **kwargs): """ - Acts as a helper for sending a message via the SNS Extended Client. + Acts as a helper for publishing a message via the SNS Extended Client. sns_extended_client: The SNS Extended Client topic_arn: The ARN associated with the SNS Topic @@ -66,22 +65,21 @@ def publish_message_helper(sns_extended_client, topic_arn, message_body, message """ - send_message_kwargs = { + publish_message_kwargs = { 'TopicArn': topic_arn, 'Message': message_body } if message_attributes: - send_message_kwargs['MessageAttributes'] = message_attributes + publish_message_kwargs['MessageAttributes'] = message_attributes if message_group_id: - send_message_kwargs['MessageGroupId'] = message_group_id - send_message_kwargs['MessageDeduplicationId'] = message_deduplication_id - + publish_message_kwargs['MessageGroupId'] = message_group_id + publish_message_kwargs['MessageDeduplicationId'] = message_deduplication_id - logger.info("Sending the message via the SNS Extended Client") + logger.info("Publishing the message via the SNS Extended Client") - response = sns_extended_client.publish(**send_message_kwargs) + response = sns_extended_client.publish(**publish_message_kwargs) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -173,9 +171,6 @@ def extract_message_body_from_response(sns_extended_client,receive_message_respo return target_s3_msg_obj['Body'].read().decode() - - - def check_receive_message_response(sns_extended_client,receive_message_response, message_body, message_attributes): """ @@ -189,9 +184,6 @@ def check_receive_message_response(sns_extended_client,receive_message_response, response_msg_body = extract_message_body_from_response(sns_extended_client,receive_message_response['Messages'][0]['Body']) assert response_msg_body == message_body - # if message_attributes: - # assert receive_message_response['Messages'][0]['MessageAttributes'] == message_attributes - def delete_message_helper(sqs_client,queue_url, receipet_handle): """ @@ -211,7 +203,6 @@ def delete_message_helper(sqs_client,queue_url, receipet_handle): ) print("Response of delete msg : ") - print(response) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -227,8 +218,6 @@ def delete_object_from_s3_helper(sns_extended_client, s3Key): logger.info("Deleting the object from the S3 bucket") - - response = sns_extended_client.s3_client.delete_object( Bucket=sns_extended_client.large_payload_support, Key=s3Key @@ -236,95 +225,10 @@ def delete_object_from_s3_helper(sns_extended_client, s3Key): assert response['ResponseMetadata']['HTTPStatusCode'] == 204 -# def test_send_receive_delete_workflow(sns_extended_client,sqs_client, queue,topic, message_body, -# message_attributes = None): -# """ - -# """ - - -def perform_send_receive_delete_workflow(sns_extended_client,sqs_client, queue,topic, message_body, - message_attributes = None): - """ - Responsible for replicating a workflow of sending, receiving and deleting a message - by calling the helper functions as and when necessary. - - sqs_extended_client: The SQS Extended Client - queue: The SQS Queue - message_body: The message body - message_attributes: The message attributes - message_group_id: Tag specifying a message belongs to a message group (Required for FIFO Queues) - message_deduplication_id: Token used for deduplication of sent message (Required for FIFO Queues) - - """ - - queue_url = queue["QueueUrl"] - sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") - - kwargs = { - 'sns_extended_client': sns_extended_client, - 'topic_arn': topic, - 'queue_url': queue_url, - 'message_body': message_body, - } - - if message_attributes: - kwargs['message_attributes'] = message_attributes - - # Adding policy to SQS queue such that SNS topic can send msg to SQS queue - policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) - response = sqs_client.set_queue_attributes( - QueueUrl = queue_url, - Attributes = { - 'Policy' : policy_json - } - ) - - # Subscribe SQS queue to SNS topic - sns_extended_client.subscribe(TopicArn=topic,Protocol="sqs",Endpoint=sqs_queue_arn,Attributes={"RawMessageDelivery":"true"}) - - publish_message_helper(**kwargs) - - # Verifying that the S3 object was added - if sns_extended_client.large_payload_support: - assert not is_s3_bucket_empty(sns_extended_client) - - # Verifying whether specific S3 object with key is created while passing in message_attributes - if message_attributes and sns_extended_client.always_through_s3: - assert sns_extended_client.s3_client.get_object( - Bucket=sns_extended_client.large_payload_support, - Key=message_attributes['S3Key']['StringValue'] - ) - - receive_message_response = receive_message_helper(sqs_client,queue_url) - - receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] - # new_receipt_handle = old_receipt_handle - - check_receive_message_response(sns_extended_client,receive_message_response, message_body, message_attributes) - - # Change the message visibility here when the parameter (change_message_visibility) is true - # if change_message_visibility: - # new_receipt_handle = change_message_visibility_helper(receive_message_response=receive_message_response, - # visibility_timeout=new_message_visibility, **kwargs) - - - delete_message_helper(sqs_client, queue_url, receipet_handle) - - # Deleting object from s3 bucket - if sns_extended_client.large_payload_support: - for receive_message in receive_message_response['Messages']: - delete_object_from_s3_helper(sns_extended_client, receive_message) - - - - return - - -def test_send_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_client,queue,topic,small_message_body): +def test_publish_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_client,queue,topic,small_message_body): """ Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. - sns_extended_client send a message to that topic with the attribute 'always_through_s3' set to true + sns_extended_client publish a message to that topic with the attribute 'always_through_s3' set to true which will store in S3 and reference of that object is received by SQS queue by calling the helper functions. sns_extedned_client_with_s3 : The SNS Extended Client with S3 Bucket @@ -334,12 +238,10 @@ def test_send_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_clien small_message_body: The Message """ - logger.info("Initializing execution of test_send_receive_small_msg_through_s3") + logger.info("Initializing execution of test_publish_receive_small_msg_through_s3") initialize_extended_client_attributes_through_s3(sns_extended_client_with_s3) - # perform_send_receive_delete_workflow(sns_extended_client_with_s3,sqs_client, queue,topic, small_message_body) - queue_url = queue["QueueUrl"] sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") @@ -350,7 +252,7 @@ def test_send_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_clien 'message_body': small_message_body, } - # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + # Adding policy to SQS queue such that SNS topic can publish msg to SQS queue policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) response = sqs_client.set_queue_attributes( QueueUrl = queue_url, @@ -378,7 +280,6 @@ def test_send_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_clien # Retrieve the message from s3 Bucket and check value assert retrive_message_from_s3(sns_extended_client_with_s3,json_receive_message_body['s3Key']) == small_message_body - # Delete message from SQS queue receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] delete_message_helper(sqs_client, queue_url, receipet_handle) @@ -388,16 +289,14 @@ def test_send_receive_small_msg_through_s3(sns_extended_client_with_s3,sqs_clien # The S3 bucket should be empty assert is_s3_bucket_empty(sns_extended_client_with_s3) - logger.info("Completed execution of test_send_receive_small_msg_through_s3") + logger.info("Completed execution of test_publish_receive_small_msg_through_s3") return - - -def test_send_receive_small_msg_not_through_s3(sns_extended_client, sqs_client, queue, topic, small_message_body): +def test_publish_receive_small_msg_not_through_s3(sns_extended_client, sqs_client, queue, topic, small_message_body): """ Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. - sns_extended_client send a message to that topic with the attribute 'always_through_s3' set to false + sns_extended_client publish a message to that topic with the attribute 'always_through_s3' set to false which will store in S3 and reference of that object is received by SQS queue by calling the helper functions. sns_extended_client: The SNS Extended Client @@ -407,7 +306,7 @@ def test_send_receive_small_msg_not_through_s3(sns_extended_client, sqs_client, small_message_body: The Message """ - logger.info("Initializing execution of test_send_receive_small_msg_not_through_s3") + logger.info("Initializing execution of test_publish_receive_small_msg_not_through_s3") queue_url = queue["QueueUrl"] sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") @@ -419,7 +318,7 @@ def test_send_receive_small_msg_not_through_s3(sns_extended_client, sqs_client, 'message_body': small_message_body, } - # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + # Adding policy to SQS queue such that SNS topic can publish msg to SQS queue policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) response = sqs_client.set_queue_attributes( QueueUrl = queue_url, @@ -438,20 +337,18 @@ def test_send_receive_small_msg_not_through_s3(sns_extended_client, sqs_client, # The body of response should have same message body that was being sent to topic by SNS assert receive_message_response['Messages'][0]['Body'] == small_message_body - # print(receive_message_response) - # Delete message from SQS queue receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] delete_message_helper(sqs_client, queue_url, receipet_handle) - logger.info("Completed execution of test_send_receive_small_msg_not_through_s3") + logger.info("Completed execution of test_publish_receive_small_msg_not_through_s3") return -def test_send_receive_large_msg_which_passes_threshold_through_s3(sns_extended_client_with_s3,sqs_client,queue,topic,large_message_body): +def test_publish_receive_large_msg_which_passes_threshold_through_s3(sns_extended_client_with_s3,sqs_client,queue,topic,large_message_body): """ Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. - sns_extended_client send a message to that topic which exceeds the default threshold + sns_extended_client publish a message to that topic which exceeds the default threshold which will store in S3 and reference of that object is received by SQS queue by calling the helper functions. sns_extedned_client_with_s3 : The SNS Extended Client with S3 Bucket @@ -461,7 +358,7 @@ def test_send_receive_large_msg_which_passes_threshold_through_s3(sns_extended_c large_message_body: The Message """ - logger.info("Initializing execution of test_send_receive_large_msg_which_passes_threshold_through_s3") + logger.info("Initializing execution of test_publish_receive_large_msg_which_passes_threshold_through_s3") queue_url = queue["QueueUrl"] sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") @@ -473,7 +370,7 @@ def test_send_receive_large_msg_which_passes_threshold_through_s3(sns_extended_c 'message_body': large_message_body, } - # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + # Adding policy to SQS queue such that SNS topic can publish msg to SQS queue policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) response = sqs_client.set_queue_attributes( QueueUrl = queue_url, @@ -500,7 +397,6 @@ def test_send_receive_large_msg_which_passes_threshold_through_s3(sns_extended_c # Retrieve the message from s3 Bucket and check value assert retrive_message_from_s3(sns_extended_client_with_s3,json_receive_message_body['s3Key']) == large_message_body - # Delete message from SQS queue receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] @@ -511,14 +407,14 @@ def test_send_receive_large_msg_which_passes_threshold_through_s3(sns_extended_c # The S3 bucket should be empty assert is_s3_bucket_empty(sns_extended_client_with_s3) - logger.info("Completed execution of test_send_receive_large_msg_which_passes_threshold_through_s3") + logger.info("Completed execution of test_publish_receive_large_msg_which_passes_threshold_through_s3") return -def test_send_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_client, queue, topic, small_message_body,custom_s3_key_attribute): +def test_publish_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_client, queue, topic, small_message_body,custom_s3_key_attribute): """ Responsible for replicating a workflow where SQS queue subscribe to sns_extended_client's topic. - sns_extended_client send a message to that topic with the custom attribute to store message in s3 + sns_extended_client publish a message to that topic with the custom attribute to store message in s3 and reference of that object is received by SQS queue by calling the helper functions. sns_extended_client_with_s3: The SNS Extended Client with Existed S3 bucket @@ -529,12 +425,10 @@ def test_send_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_cl custom_s3_key_attribute: Attribute to set custom Key of message """ - logger.info("Initializing execution of test_send_receive_small_msg_through_s3") + logger.info("Initializing execution of test_publish_receive_small_msg_through_s3") initialize_extended_client_attributes_through_s3(sns_extended_client_with_s3) - # perform_send_receive_delete_workflow(sns_extended_client_with_s3,sqs_client, queue,topic, small_message_body) - queue_url = queue["QueueUrl"] sqs_queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") @@ -546,7 +440,7 @@ def test_send_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_cl 'message_attributes': custom_s3_key_attribute } - # Adding policy to SQS queue such that SNS topic can send msg to SQS queue + # Adding policy to SQS queue such that SNS topic can publish msg to SQS queue policy_json = create_allow_sns_to_write_to_sqs_policy_json(topic, sqs_queue_arn) response = sqs_client.set_queue_attributes( QueueUrl = queue_url, @@ -570,14 +464,12 @@ def test_send_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_cl message_stored_in_s3_attributes = ['s3BucketName','s3Key'] for key in json_receive_message_body.keys(): assert key in message_stored_in_s3_attributes - # Stored message key should be same as custom_s3_key_attribute datavalue assert custom_s3_key_attribute['S3Key']['StringValue'] == json_receive_message_body['s3Key'] # Retrieve the message from s3 Bucket and check value assert retrive_message_from_s3(sns_extended_client_with_s3,json_receive_message_body['s3Key']) == small_message_body - # Delete message from SQS queue receipet_handle = receive_message_response['Messages'][0]['ReceiptHandle'] @@ -588,16 +480,10 @@ def test_send_receive_msg_with_custom_s3_key(sns_extended_client_with_s3, sqs_cl # The S3 bucket should be empty assert is_s3_bucket_empty(sns_extended_client_with_s3) - logger.info("Completed execution of test_send_receive_small_msg_through_s3") + logger.info("Completed execution of test_publish_receive_small_msg_through_s3") return - - - - - - def test_session(session): assert boto3.session.Session == SNSExtendedClientSession From 7b767b5d18aff50ce047a2a6cd494674db29ebd4 Mon Sep 17 00:00:00 2001 From: Mit Suthar Date: Fri, 14 Feb 2025 14:23:15 -0800 Subject: [PATCH 3/4] Add integration tests to Release workflow and Unit tests to PullRequest workflow --- .github/workflows/pull_requests.yml | 35 +++++++++++++++++++++++++++++ .github/workflows/release.yml | 13 ++++++----- .gitignore | 1 + 3 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/pull_requests.yml diff --git a/.github/workflows/pull_requests.yml b/.github/workflows/pull_requests.yml new file mode 100644 index 0000000..b72f114 --- /dev/null +++ b/.github/workflows/pull_requests.yml @@ -0,0 +1,35 @@ +name: PullRequest + +on: + pull_request: + branches: + - main + +jobs: + test: + runs-on: ubuntu-latest + permissions: + contents: write + id-token: write + steps: + - name: Check out the repository + uses: actions/checkout@v3 + with: + fetch-depth: 2 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.9" + - name: Install Poetry + run: | + pip install poetry + poetry --version + - name: Build package + run: | + poetry build + - name: Install package + run: | + poetry install + - name: Run pytest + run: | + poetry run pytest --cov=sns_extended_client test --cov-report term-missing \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 50f72c2..be90048 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -34,7 +34,7 @@ jobs: - name: Detect and tag new version id: check-version if: steps.check-parent-commit.outputs.sha - uses: salsify/action-detect-and-tag-new-version@b1778166f13188a9d478e2d1198f993011ba9864 # v2.0.3 + uses: salsify/action-detect-and-tag-new-version@v2 with: version-command: | bash -o pipefail -c "poetry version | awk '{ print \$2 }'" @@ -54,11 +54,14 @@ jobs: run: | poetry run pytest --cov=sns_extended_client test --cov-report term-missing - name: configure aws credentials - uses: aws-actions/configure-aws-credentials@5fd3084fc36e372ff1fff382a39b10d03659f355 # v2.2.0 + uses: aws-actions/configure-aws-credentials@v4 with: role-to-assume: ${{ vars.OIDC_ROLE_NAME }} role-session-name: publishrolesession aws-region: ${{ env.AWS_REGION }} + - name: Run Integration Tests + run: | + poetry run pytest test_integ - name: Retrieve TEST PYPI TOKEN from secretsmanager id: get-test-pypi-token if: "! steps.check-version.outputs.tag" @@ -71,14 +74,14 @@ jobs: echo "token=$(aws secretsmanager get-secret-value --secret-id ${{ vars.PYPI_TOKEN_NAME }} | jq -r '.SecretString')" >> $GITHUB_OUTPUT - name: Publish package on TestPyPI if: "! steps.check-version.outputs.tag" - uses: pypa/gh-action-pypi-publish@f8c70e705ffc13c3b4d1221169b84f12a75d6ca8 # release/v1 + uses: pypa/gh-action-pypi-publish@release/v1 with: user: __token__ password: ${{ steps.get-test-pypi-token.outputs.token }} - repository_url: https://test.pypi.org/legacy/ + repository-url: https://test.pypi.org/legacy/ - name: Publish package on PyPI if: steps.check-version.outputs.tag - uses: pypa/gh-action-pypi-publish@f8c70e705ffc13c3b4d1221169b84f12a75d6ca8 # release/v1 + uses: pypa/gh-action-pypi-publish@release/v1 with: user: __token__ password: ${{ steps.get-pypi-token.outputs.token }} diff --git a/.gitignore b/.gitignore index 70a6fb7..ded6461 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .vscode +.coverage .pytest_cache __pycache__ *.DS_Store From 94ecf909348206ca25b80ba45dda3538b9989e4a Mon Sep 17 00:00:00 2001 From: Mit Suthar Date: Fri, 14 Feb 2025 14:28:04 -0800 Subject: [PATCH 4/4] Bump up the version to 1.0.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7f0b865..490bcf9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "amazon-sns-extended-client" -version = "1.0.0" +version = "1.0.1" description = "Python version of AWS SNS extended client to publish large payload message" authors = ["Amazon Web Service - SNS"] license = "Apache-2.0"