@@ -230,10 +230,31 @@ s3_extended_payload_bucket = "extended-client-bucket-store" # S3 bucket with th
230230TOPIC_NAME = " ---TOPIC-NAME---"
231231QUEUE_NAME = " ---QUEUE-NAME---"
232232
233+ def allow_sns_to_write_to_sqs (topicarn , queuearn ):
234+ policy_document = """ {{
235+ "Version":"2012-10-17",
236+ "Statement":[
237+ {{
238+ "Sid":"MyPolicy",
239+ "Effect":"Allow",
240+ "Principal" : {{ "AWS" : "*"}} ,
241+ "Action":"SQS:SendMessage",
242+ "Resource": "{} ",
243+ "Condition":{{
244+ "ArnEquals":{{
245+ "aws:SourceArn": "{} "
246+ }}
247+ }}
248+ }}
249+ ]
250+ }} """ .format(queuearn, topicarn)
251+
252+ return policy_document
253+
233254def get_msg_from_s3 (body ):
234255 """ Handy Helper to fetch message from S3"""
235256 json_msg = loads(body)
236- s3_client = boto3.client(" s3" )
257+ s3_client = boto3.client(" s3" , region_name = " us-east-1 " )
237258 s3_object = s3_client.get_object(
238259 Bucket = json_msg[1 ].get(" s3BucketName" ), Key = json_msg[1 ].get(" s3Key" )
239260 )
@@ -242,35 +263,60 @@ def get_msg_from_s3(body):
242263
243264
244265def fetch_and_print_from_sqs (sqs , queue_url ):
245- message = sqs.receive_message(
246- QueueUrl = queue_url, MessageAttributeNames = [" All" ], MaxNumberOfMessages = 1
266+ sqs_msg = sqs.receive_message(
267+ QueueUrl = queue_url,
268+ AttributeNames = [' All' ],
269+ MessageAttributeNames = [' All' ],
270+ VisibilityTimeout = 0 ,
271+ WaitTimeSeconds = 0 ,
272+ MaxNumberOfMessages = 1
247273 ).get(" Messages" )[0 ]
248- message_body = message.get(" Body" )
274+
275+ message_body = sqs_msg.get(" Body" )
249276 print (" Published Message: {} " .format(message_body))
250277 print (" Message Stored in S3 Bucket is: {} \n " .format(get_msg_from_s3(message_body)))
251278
279+ # Delete the Processed Message
280+ sqs.delete_message(
281+ QueueUrl = queue_url,
282+ ReceiptHandle = sqs_msg[' ReceiptHandle' ]
283+ )
284+
252285
253286sns_extended_client = boto3.client(" sns" , region_name = " us-east-1" )
254287create_topic_response = sns_extended_client.create_topic(Name = TOPIC_NAME )
255- demo_topic_arn = create_topic_response.get(" TopicArn" )
288+ sns_topic_arn = create_topic_response.get(" TopicArn" )
256289
257290# create and subscribe an sqs queue to the sns client
258- sqs = boto3.client(" sqs" )
291+ sqs = boto3.client(" sqs" , region_name = " us-east-1 " )
259292demo_queue_url = sqs.create_queue(QueueName = QUEUE_NAME ).get(" QueueUrl" )
260- demo_queue_arn = sqs.get_queue_attributes(
293+ sqs_queue_arn = sqs.get_queue_attributes(
261294 QueueUrl = demo_queue_url, AttributeNames = [" QueueArn" ]
262295)[" Attributes" ].get(" QueueArn" )
263296
297+ # Adding policy to SQS queue such that SNS topic can send msg to SQS queue
298+ policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn)
299+ response = sqs.set_queue_attributes(
300+ QueueUrl = demo_queue_url,
301+ Attributes = {
302+ ' Policy' : policy_json
303+ }
304+ )
305+
264306# Set the RawMessageDelivery subscription attribute to TRUE if you want to use
265307# SQSExtendedClient to help with retrieving msg from S3
266- sns_extended_client.subscribe(TopicArn = demo_topic_arn, Protocol = " sqs" ,
267- Endpoint = demo_queue_arn, Attributes = {" RawMessageDelivery" :" true" })
308+ sns_extended_client.subscribe(TopicArn = sns_topic_arn, Protocol = " sqs" ,
309+ Endpoint = sqs_queue_arn
310+ , Attributes = {" RawMessageDelivery" :" true" }
311+ )
268312
269- # Below is the example that all the messages will be sent to the S3 bucket
270313sns_extended_client.large_payload_support = s3_extended_payload_bucket
314+
315+
316+ # Below is the example that all the messages will be sent to the S3 bucket
271317sns_extended_client.always_through_s3 = True
272318sns_extended_client.publish(
273- TopicArn = demo_topic_arn , Message = " This message should be published to S3"
319+ TopicArn = sns_topic_arn , Message = " This message should be published to S3"
274320)
275321print (" \n\n Published using SNS extended client:" )
276322fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3
@@ -281,19 +327,19 @@ print("\nUsing decreased message size threshold:")
281327sns_extended_client.always_through_s3 = False
282328sns_extended_client.message_size_threshold = 32
283329sns_extended_client.publish(
284- TopicArn = demo_topic_arn ,
330+ TopicArn = sns_topic_arn ,
285331 Message = " This message should be published to S3 as it exceeds the limit of the 32 bytes" ,
286332)
287333
288334fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3
289335
290336
291- # Below is the example to publish message using the SNS.Topic resource
337+ # # Below is the example to publish message using the SNS.Topic resource
292338sns_extended_client_resource = SNSExtendedClientSession().resource(
293339 " sns" , region_name = " us-east-1"
294340)
295341
296- topic = sns_extended_client_resource.Topic(demo_topic_arn )
342+ topic = sns_extended_client_resource.Topic(sns_topic_arn )
297343topic.large_payload_support = s3_extended_payload_bucket
298344topic.always_through_s3 = True
299345# Can Set custom S3 Keys to be used to store objects in S3
0 commit comments