diff --git a/README.md b/README.md index 3c4f89b..9a0ff3d 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ sns-extended-client allows for publishing large messages through SNS via S3. Thi * use_legacy_attribute -- if `True`, then all published messages use the Legacy reserved message attribute (SQSLargePayloadSize) instead of the current reserved message attribute (ExtendedPayloadSize). * message_size_threshold -- the threshold for storing the message in the large messages bucket. Cannot be less than `0` or greater than `262144`. Defaults to `262144`. * always_through_s3 -- if `True`, then all messages will be serialized to S3. Defaults to `False` -* s3 -- the boto3 S3 `resource` object to use to store objects to S3. Use this if you want to control the S3 resource (for example, custom S3 config or credentials). Defaults to `boto3.resource("s3")` on first use if not previously set. +* s3_client -- the boto3 S3 `client` object to use to store objects to S3. Use this if you want to control the S3 client (for example, custom S3 config or credentials). Defaults to `boto3.client("s3")` on first use if not previously set. ## Usage @@ -108,25 +108,27 @@ platform_endpoint = resource.PlatformEndpoint('endpoint-arn') platform_endpoint.large_payload_support = 'my-bucket-name' platform_endpoint.always_through_s3 = True ``` -### Setting a custom S3 resource +### Setting a custom S3 config ```python import boto3 from botocore.config import Config import sns_extended_client -# Low level client -sns = boto3.client('sns') -sns.large_payload_support = 'my-bucket-name' -sns.s3 = boto3.resource( - 's3', - config=Config( - signature_version='s3v4', +# Define Configuration for boto3's S3 Client +# NOTE - The boto3 version from 1.36.0 to 1.36.6 will throw an error if you enable accelerate_endpoint. +s3_client_config = Config( + region_name = 'us-east-1', + signature_version = 's3v4', s3={ - "use_accelerate_endpoint": True + "use_accelerate_endpoint":True } - ) ) +# Low level client +sns = boto3.client('sns') +sns.large_payload_support = 'my-bucket-name' +sns.s3_client = boto3.client("s3", config=s3_client_config) + # boto SNS.Topic resource resource = boto3.resource('sns') topic = resource.Topic('topic-arn') @@ -135,30 +137,14 @@ topic = resource.Topic('topic-arn') topic = resource.topic(Name='topic-name') topic.large_payload_support = 'my-bucket-name' -topic.s3 = boto3.resource( - 's3', - config=Config( - signature_version='s3v4', - s3={ - "use_accelerate_endpoint": True - } - ) -) +topic.s3_client = boto3.client("s3", config=s3_client_config) # boto SNS.PlatformEndpoint resource resource = boto3.resource('sns') platform_endpoint = resource.PlatformEndpoint('endpoint-arn') platform_endpoint.large_payload_support = 'my-bucket-name' -platform_endpoint.s3 = boto3.resource( - 's3', - config=Config( - signature_version='s3v4', - s3={ - "use_accelerate_endpoint": True - } - ) -) +platform_endpoint.s3_client = boto3.client("s3", config=s3_client_config) ``` ### Setting a custom S3 Key @@ -230,50 +216,98 @@ s3_extended_payload_bucket = "extended-client-bucket-store" # S3 bucket with th TOPIC_NAME = "---TOPIC-NAME---" QUEUE_NAME = "---QUEUE-NAME---" -def get_msg_from_s3(body): +def allow_sns_to_write_to_sqs(topicarn, queuearn): + policy_document = """{{ + "Version":"2012-10-17", + "Statement":[ + {{ + "Sid":"MyPolicy", + "Effect":"Allow", + "Principal" : {{"AWS" : "*"}}, + "Action":"SQS:SendMessage", + "Resource": "{}", + "Condition":{{ + "ArnEquals":{{ + "aws:SourceArn": "{}" + }} + }} + }} + ] + }}""".format(queuearn, topicarn) + + return policy_document + +def get_msg_from_s3(body,sns_extended_client): """Handy Helper to fetch message from S3""" json_msg = loads(body) - s3_client = boto3.client("s3") - s3_object = s3_client.get_object( + s3_object = sns_extended_client.s3_client.get_object( Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key") ) msg = s3_object.get("Body").read().decode() return msg -def fetch_and_print_from_sqs(sqs, queue_url): - message = sqs.receive_message( - QueueUrl=queue_url, MessageAttributeNames=["All"], MaxNumberOfMessages=1 +def fetch_and_print_from_sqs(sqs, queue_url,sns_extended_client): + sqs_msg = sqs.receive_message( + QueueUrl=queue_url, + AttributeNames=['All'], + MessageAttributeNames=['All'], + VisibilityTimeout=0, + WaitTimeSeconds=0, + MaxNumberOfMessages=1 ).get("Messages")[0] - message_body = message.get("Body") + + message_body = sqs_msg.get("Body") print("Published Message: {}".format(message_body)) - print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body))) + print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body,sns_extended_client))) + + # Delete the Processed Message + sqs.delete_message( + QueueUrl=queue_url, + ReceiptHandle=sqs_msg['ReceiptHandle'] + ) sns_extended_client = boto3.client("sns", region_name="us-east-1") create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME) -demo_topic_arn = create_topic_response.get("TopicArn") +sns_topic_arn = create_topic_response.get("TopicArn") # create and subscribe an sqs queue to the sns client -sqs = boto3.client("sqs") +sqs = boto3.client("sqs",region_name="us-east-1") demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl") -demo_queue_arn = sqs.get_queue_attributes( +sqs_queue_arn = sqs.get_queue_attributes( QueueUrl=demo_queue_url, AttributeNames=["QueueArn"] )["Attributes"].get("QueueArn") +# Adding policy to SQS queue such that SNS topic can send msg to SQS queue +policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn) +response = sqs.set_queue_attributes( + QueueUrl = demo_queue_url, + Attributes = { + 'Policy' : policy_json + } +) + # Set the RawMessageDelivery subscription attribute to TRUE if you want to use # SQSExtendedClient to help with retrieving msg from S3 -sns_extended_client.subscribe(TopicArn=demo_topic_arn, Protocol="sqs", -Endpoint=demo_queue_arn, Attributes={"RawMessageDelivery":"true"}) +sns_extended_client.subscribe(TopicArn=sns_topic_arn, Protocol="sqs", +Endpoint=sqs_queue_arn +, Attributes={"RawMessageDelivery":"true"} +) -# Below is the example that all the messages will be sent to the S3 bucket sns_extended_client.large_payload_support = s3_extended_payload_bucket + +# Change default s3_client attribute of sns_extended_client to use 'us-east-1' region +sns_extended_client.s3_client = boto3.client("s3", region_name="us-east-1") + + +# Below is the example that all the messages will be sent to the S3 bucket sns_extended_client.always_through_s3 = True sns_extended_client.publish( - TopicArn=demo_topic_arn, Message="This message should be published to S3" + TopicArn=sns_topic_arn, Message="This message should be published to S3" ) print("\n\nPublished using SNS extended client:") -fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3 +fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client) # Prints message stored in s3 # Below is the example that all the messages larger than 32 bytes will be sent to the S3 bucket print("\nUsing decreased message size threshold:") @@ -281,11 +315,11 @@ print("\nUsing decreased message size threshold:") sns_extended_client.always_through_s3 = False sns_extended_client.message_size_threshold = 32 sns_extended_client.publish( - TopicArn=demo_topic_arn, + TopicArn=sns_topic_arn, Message="This message should be published to S3 as it exceeds the limit of the 32 bytes", ) -fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3 +fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client) # Prints message stored in s3 # Below is the example to publish message using the SNS.Topic resource @@ -293,8 +327,12 @@ sns_extended_client_resource = SNSExtendedClientSession().resource( "sns", region_name="us-east-1" ) -topic = sns_extended_client_resource.Topic(demo_topic_arn) +topic = sns_extended_client_resource.Topic(sns_topic_arn) topic.large_payload_support = s3_extended_payload_bucket + +# Change default s3_client attribute of topic to use 'us-east-1' region +topic.s3_client = boto3.client("s3", region_name="us-east-1") + topic.always_through_s3 = True # Can Set custom S3 Keys to be used to store objects in S3 topic.publish( @@ -307,24 +345,51 @@ topic.publish( }, ) print("\nPublished using Topic Resource:") -fetch_and_print_from_sqs(sqs, demo_queue_url) +fetch_and_print_from_sqs(sqs, demo_queue_url,topic) + +# Below is the example to publish message using the SNS.PlatformEndpoint resource +sns_extended_client_resource = SNSExtendedClientSession().resource( + "sns", region_name="us-east-1" +) + +platform_endpoint = sns_extended_client_resource.PlatformEndpoint(sns_topic_arn) +platform_endpoint.large_payload_support = s3_extended_payload_bucket + +# Change default s3_client attribute of platform_endpoint to use 'us-east-1' region +platform_endpoint.s3_client = boto3.client("s3", region_name="us-east-1") + +platform_endpoint.always_through_s3 = True +# Can Set custom S3 Keys to be used to store objects in S3 +platform_endpoint.publish( + Message="This message should be published to S3 using the PlatformEndpoint resource", + MessageAttributes={ + "S3Key": { + "DataType": "String", + "StringValue": "247c11c4-a22c-42e4-a6a2-9b5af5b76587", + } + }, +) +print("\nPublished using PlatformEndpoint Resource:") +fetch_and_print_from_sqs(sqs, demo_queue_url,platform_endpoint) ``` PRODUCED OUTPUT: ``` Published using SNS extended client: -Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "465d51ea-2c85-4cf8-9ff7-f0a20636ac54"}] +Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "10999f58-c5ae-4d68-9208-f70475e0113d"}] Message Stored in S3 Bucket is: This message should be published to S3 - Using decreased message size threshold: -Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "4e32bc6c-e67e-4e09-982b-66dfbe0c588a"}] +Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "2c5cb2c7-e649-492b-85fb-fa9923cb02bf"}] Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the limit of the 32 bytes - Published using Topic Resource: Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "347c11c4-a22c-42e4-a6a2-9b5af5b76587"}] Message Stored in S3 Bucket is: This message should be published to S3 using the topic resource + +Published using PlatformEndpoint Resource: +Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "247c11c4-a22c-42e4-a6a2-9b5af5b76587"}] +Message Stored in S3 Bucket is: This message should be published to S3 using the PlatformEndpoint resource ``` ## DEVELOPMENT @@ -342,5 +407,4 @@ See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more inform ## License -This project is licensed under the Apache-2.0 License. - +This project is licensed under the Apache-2.0 License. \ No newline at end of file