@@ -21,7 +21,7 @@ sns-extended-client allows for publishing large messages through SNS via S3. Thi
2121* use_legacy_attribute -- if ` True ` , then all published messages use the Legacy reserved message attribute (SQSLargePayloadSize) instead of the current reserved message attribute (ExtendedPayloadSize).
2222* message_size_threshold -- the threshold for storing the message in the large messages bucket. Cannot be less than ` 0 ` or greater than ` 262144 ` . Defaults to ` 262144 ` .
2323* always_through_s3 -- if ` True ` , then all messages will be serialized to S3. Defaults to ` False `
24- * s3 -- the boto3 S3 ` resource ` object to use to store objects to S3. Use this if you want to control the S3 resource (for example, custom S3 config or credentials). Defaults to ` boto3.resource ("s3") ` on first use if not previously set.
24+ * s3_client -- the boto3 S3 ` client ` object to use to store objects to S3. Use this if you want to control the S3 client (for example, custom S3 config or credentials). Defaults to ` boto3.client ("s3") ` on first use if not previously set.
2525
2626## Usage
2727
@@ -108,25 +108,27 @@ platform_endpoint = resource.PlatformEndpoint('endpoint-arn')
108108platform_endpoint.large_payload_support = ' my-bucket-name'
109109platform_endpoint.always_through_s3 = True
110110```
111- ### Setting a custom S3 resource
111+ ### Setting a custom S3 config
112112``` python
113113import boto3
114114from botocore.config import Config
115115import sns_extended_client
116116
117- # Low level client
118- sns = boto3.client(' sns' )
119- sns.large_payload_support = ' my-bucket-name'
120- sns.s3 = boto3.resource(
121- ' s3' ,
122- config = Config(
123- signature_version = ' s3v4' ,
117+ # Define Configuration for boto3's S3 Client
118+ # NOTE - The boto3 version from 1.36.0 to 1.36.6 will throw an error if you enable accelerate_endpoint.
119+ s3_client_config = Config(
120+ region_name = ' us-east-1' ,
121+ signature_version = ' s3v4' ,
124122 s3 = {
125- " use_accelerate_endpoint" : True
123+ " use_accelerate_endpoint" :True
126124 }
127- )
128125)
129126
127+ # Low level client
128+ sns = boto3.client(' sns' )
129+ sns.large_payload_support = ' my-bucket-name'
130+ sns.s3_client = boto3.client(" s3" , config = s3_client_config)
131+
130132# boto SNS.Topic resource
131133resource = boto3.resource(' sns' )
132134topic = resource.Topic(' topic-arn' )
@@ -135,30 +137,14 @@ topic = resource.Topic('topic-arn')
135137topic = resource.topic(Name = ' topic-name' )
136138
137139topic.large_payload_support = ' my-bucket-name'
138- topic.s3 = boto3.resource(
139- ' s3' ,
140- config = Config(
141- signature_version = ' s3v4' ,
142- s3 = {
143- " use_accelerate_endpoint" : True
144- }
145- )
146- )
140+ topic.s3_client = boto3.client(" s3" , config = s3_client_config)
147141
148142# boto SNS.PlatformEndpoint resource
149143resource = boto3.resource(' sns' )
150144platform_endpoint = resource.PlatformEndpoint(' endpoint-arn' )
151145
152146platform_endpoint.large_payload_support = ' my-bucket-name'
153- platform_endpoint.s3 = boto3.resource(
154- ' s3' ,
155- config = Config(
156- signature_version = ' s3v4' ,
157- s3 = {
158- " use_accelerate_endpoint" : True
159- }
160- )
161- )
147+ platform_endpoint.s3_client = boto3.client(" s3" , config = s3_client_config)
162148```
163149
164150### Setting a custom S3 Key
@@ -230,71 +216,123 @@ s3_extended_payload_bucket = "extended-client-bucket-store" # S3 bucket with th
230216TOPIC_NAME = " ---TOPIC-NAME---"
231217QUEUE_NAME = " ---QUEUE-NAME---"
232218
233- def get_msg_from_s3 (body ):
219+ def allow_sns_to_write_to_sqs (topicarn , queuearn ):
220+ policy_document = """ {{
221+ "Version":"2012-10-17",
222+ "Statement":[
223+ {{
224+ "Sid":"MyPolicy",
225+ "Effect":"Allow",
226+ "Principal" : {{ "AWS" : "*"}} ,
227+ "Action":"SQS:SendMessage",
228+ "Resource": "{} ",
229+ "Condition":{{
230+ "ArnEquals":{{
231+ "aws:SourceArn": "{} "
232+ }}
233+ }}
234+ }}
235+ ]
236+ }} """ .format(queuearn, topicarn)
237+
238+ return policy_document
239+
240+ def get_msg_from_s3 (body ,sns_extended_client ):
234241 """ Handy Helper to fetch message from S3"""
235242 json_msg = loads(body)
236- s3_client = boto3.client(" s3" )
237- s3_object = s3_client.get_object(
243+ s3_object = sns_extended_client.s3_client.get_object(
238244 Bucket = json_msg[1 ].get(" s3BucketName" ), Key = json_msg[1 ].get(" s3Key" )
239245 )
240246 msg = s3_object.get(" Body" ).read().decode()
241247 return msg
242248
243249
244- def fetch_and_print_from_sqs (sqs , queue_url ):
245- message = sqs.receive_message(
246- QueueUrl = queue_url, MessageAttributeNames = [" All" ], MaxNumberOfMessages = 1
250+ def fetch_and_print_from_sqs (sqs , queue_url ,sns_extended_client ):
251+ sqs_msg = sqs.receive_message(
252+ QueueUrl = queue_url,
253+ AttributeNames = [' All' ],
254+ MessageAttributeNames = [' All' ],
255+ VisibilityTimeout = 0 ,
256+ WaitTimeSeconds = 0 ,
257+ MaxNumberOfMessages = 1
247258 ).get(" Messages" )[0 ]
248- message_body = message.get(" Body" )
259+
260+ message_body = sqs_msg.get(" Body" )
249261 print (" Published Message: {} " .format(message_body))
250- print (" Message Stored in S3 Bucket is: {} \n " .format(get_msg_from_s3(message_body)))
262+ print (" Message Stored in S3 Bucket is: {} \n " .format(get_msg_from_s3(message_body,sns_extended_client)))
263+
264+ # Delete the Processed Message
265+ sqs.delete_message(
266+ QueueUrl = queue_url,
267+ ReceiptHandle = sqs_msg[' ReceiptHandle' ]
268+ )
251269
252270
253271sns_extended_client = boto3.client(" sns" , region_name = " us-east-1" )
254272create_topic_response = sns_extended_client.create_topic(Name = TOPIC_NAME )
255- demo_topic_arn = create_topic_response.get(" TopicArn" )
273+ sns_topic_arn = create_topic_response.get(" TopicArn" )
256274
257275# create and subscribe an sqs queue to the sns client
258- sqs = boto3.client(" sqs" )
276+ sqs = boto3.client(" sqs" , region_name = " us-east-1 " )
259277demo_queue_url = sqs.create_queue(QueueName = QUEUE_NAME ).get(" QueueUrl" )
260- demo_queue_arn = sqs.get_queue_attributes(
278+ sqs_queue_arn = sqs.get_queue_attributes(
261279 QueueUrl = demo_queue_url, AttributeNames = [" QueueArn" ]
262280)[" Attributes" ].get(" QueueArn" )
263281
282+ # Adding policy to SQS queue such that SNS topic can send msg to SQS queue
283+ policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn)
284+ response = sqs.set_queue_attributes(
285+ QueueUrl = demo_queue_url,
286+ Attributes = {
287+ ' Policy' : policy_json
288+ }
289+ )
290+
264291# Set the RawMessageDelivery subscription attribute to TRUE if you want to use
265292# SQSExtendedClient to help with retrieving msg from S3
266- sns_extended_client.subscribe(TopicArn = demo_topic_arn, Protocol = " sqs" ,
267- Endpoint = demo_queue_arn, Attributes = {" RawMessageDelivery" :" true" })
293+ sns_extended_client.subscribe(TopicArn = sns_topic_arn, Protocol = " sqs" ,
294+ Endpoint = sqs_queue_arn
295+ , Attributes = {" RawMessageDelivery" :" true" }
296+ )
268297
269- # Below is the example that all the messages will be sent to the S3 bucket
270298sns_extended_client.large_payload_support = s3_extended_payload_bucket
299+
300+ # Change default s3_client attribute of sns_extended_client to use 'us-east-1' region
301+ sns_extended_client.s3_client = boto3.client(" s3" , region_name = " us-east-1" )
302+
303+
304+ # Below is the example that all the messages will be sent to the S3 bucket
271305sns_extended_client.always_through_s3 = True
272306sns_extended_client.publish(
273- TopicArn = demo_topic_arn , Message = " This message should be published to S3"
307+ TopicArn = sns_topic_arn , Message = " This message should be published to S3"
274308)
275309print (" \n\n Published using SNS extended client:" )
276- fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3
310+ fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client ) # Prints message stored in s3
277311
278312# Below is the example that all the messages larger than 32 bytes will be sent to the S3 bucket
279313print (" \n Using decreased message size threshold:" )
280314
281315sns_extended_client.always_through_s3 = False
282316sns_extended_client.message_size_threshold = 32
283317sns_extended_client.publish(
284- TopicArn = demo_topic_arn ,
318+ TopicArn = sns_topic_arn ,
285319 Message = " This message should be published to S3 as it exceeds the limit of the 32 bytes" ,
286320)
287321
288- fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3
322+ fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client ) # Prints message stored in s3
289323
290324
291325# Below is the example to publish message using the SNS.Topic resource
292326sns_extended_client_resource = SNSExtendedClientSession().resource(
293327 " sns" , region_name = " us-east-1"
294328)
295329
296- topic = sns_extended_client_resource.Topic(demo_topic_arn )
330+ topic = sns_extended_client_resource.Topic(sns_topic_arn )
297331topic.large_payload_support = s3_extended_payload_bucket
332+
333+ # Change default s3_client attribute of topic to use 'us-east-1' region
334+ topic.s3_client = boto3.client(" s3" , region_name = " us-east-1" )
335+
298336topic.always_through_s3 = True
299337# Can Set custom S3 Keys to be used to store objects in S3
300338topic.publish(
@@ -307,24 +345,51 @@ topic.publish(
307345 },
308346)
309347print (" \n Published using Topic Resource:" )
310- fetch_and_print_from_sqs(sqs, demo_queue_url)
348+ fetch_and_print_from_sqs(sqs, demo_queue_url,topic)
349+
350+ # Below is the example to publish message using the SNS.PlatformEndpoint resource
351+ sns_extended_client_resource = SNSExtendedClientSession().resource(
352+ " sns" , region_name = " us-east-1"
353+ )
354+
355+ platform_endpoint = sns_extended_client_resource.PlatformEndpoint(sns_topic_arn)
356+ platform_endpoint.large_payload_support = s3_extended_payload_bucket
357+
358+ # Change default s3_client attribute of platform_endpoint to use 'us-east-1' region
359+ platform_endpoint.s3_client = boto3.client(" s3" , region_name = " us-east-1" )
360+
361+ platform_endpoint.always_through_s3 = True
362+ # Can Set custom S3 Keys to be used to store objects in S3
363+ platform_endpoint.publish(
364+ Message = " This message should be published to S3 using the PlatformEndpoint resource" ,
365+ MessageAttributes = {
366+ " S3Key" : {
367+ " DataType" : " String" ,
368+ " StringValue" : " 247c11c4-a22c-42e4-a6a2-9b5af5b76587" ,
369+ }
370+ },
371+ )
372+ print (" \n Published using PlatformEndpoint Resource:" )
373+ fetch_and_print_from_sqs(sqs, demo_queue_url,platform_endpoint)
311374```
312375
313376PRODUCED OUTPUT:
314377```
315378Published using SNS extended client:
316- Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "465d51ea-2c85-4cf8-9ff7-f0a20636ac54 "}]
379+ Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "10999f58-c5ae-4d68-9208-f70475e0113d "}]
317380Message Stored in S3 Bucket is: This message should be published to S3
318381
319-
320382Using decreased message size threshold:
321- Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "4e32bc6c-e67e-4e09-982b-66dfbe0c588a "}]
383+ Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "2c5cb2c7-e649-492b-85fb-fa9923cb02bf "}]
322384Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the limit of the 32 bytes
323385
324-
325386Published using Topic Resource:
326387Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "347c11c4-a22c-42e4-a6a2-9b5af5b76587"}]
327388Message Stored in S3 Bucket is: This message should be published to S3 using the topic resource
389+
390+ Published using PlatformEndpoint Resource:
391+ Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "247c11c4-a22c-42e4-a6a2-9b5af5b76587"}]
392+ Message Stored in S3 Bucket is: This message should be published to S3 using the PlatformEndpoint resource
328393```
329394
330395## DEVELOPMENT
@@ -342,5 +407,4 @@ See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more inform
342407
343408## License
344409
345- This project is licensed under the Apache-2.0 License.
346-
410+ This project is licensed under the Apache-2.0 License.
0 commit comments