Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,31 @@ s3_extended_payload_bucket = "extended-client-bucket-store" # S3 bucket with th
TOPIC_NAME = "---TOPIC-NAME---"
QUEUE_NAME = "---QUEUE-NAME---"

def allow_sns_to_write_to_sqs(topicarn, queuearn):
policy_document = """{{
"Version":"2012-10-17",
"Statement":[
{{
"Sid":"MyPolicy",
"Effect":"Allow",
"Principal" : {{"AWS" : "*"}},
"Action":"SQS:SendMessage",
"Resource": "{}",
"Condition":{{
"ArnEquals":{{
"aws:SourceArn": "{}"
}}
}}
}}
]
}}""".format(queuearn, topicarn)

return policy_document

def get_msg_from_s3(body):
"""Handy Helper to fetch message from S3"""
json_msg = loads(body)
s3_client = boto3.client("s3")
s3_client = boto3.client("s3",region_name="us-east-1")
s3_object = s3_client.get_object(
Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key")
)
Expand All @@ -242,35 +263,60 @@ def get_msg_from_s3(body):


def fetch_and_print_from_sqs(sqs, queue_url):
message = sqs.receive_message(
QueueUrl=queue_url, MessageAttributeNames=["All"], MaxNumberOfMessages=1
sqs_msg = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=['All'],
MessageAttributeNames=['All'],
VisibilityTimeout=0,
WaitTimeSeconds=0,
MaxNumberOfMessages=1
).get("Messages")[0]
message_body = message.get("Body")

message_body = sqs_msg.get("Body")
print("Published Message: {}".format(message_body))
print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body)))

# Delete the Processed Message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=sqs_msg['ReceiptHandle']
)


sns_extended_client = boto3.client("sns", region_name="us-east-1")
create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME)
demo_topic_arn = create_topic_response.get("TopicArn")
sns_topic_arn = create_topic_response.get("TopicArn")

# create and subscribe an sqs queue to the sns client
sqs = boto3.client("sqs")
sqs = boto3.client("sqs",region_name="us-east-1")
demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl")
demo_queue_arn = sqs.get_queue_attributes(
sqs_queue_arn = sqs.get_queue_attributes(
QueueUrl=demo_queue_url, AttributeNames=["QueueArn"]
)["Attributes"].get("QueueArn")

# Adding policy to SQS queue such that SNS topic can send msg to SQS queue
policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn)
response = sqs.set_queue_attributes(
QueueUrl = demo_queue_url,
Attributes = {
'Policy' : policy_json
}
)

# Set the RawMessageDelivery subscription attribute to TRUE if you want to use
# SQSExtendedClient to help with retrieving msg from S3
sns_extended_client.subscribe(TopicArn=demo_topic_arn, Protocol="sqs",
Endpoint=demo_queue_arn, Attributes={"RawMessageDelivery":"true"})
sns_extended_client.subscribe(TopicArn=sns_topic_arn, Protocol="sqs",
Endpoint=sqs_queue_arn
, Attributes={"RawMessageDelivery":"true"}
)

# Below is the example that all the messages will be sent to the S3 bucket
sns_extended_client.large_payload_support = s3_extended_payload_bucket


# Below is the example that all the messages will be sent to the S3 bucket
sns_extended_client.always_through_s3 = True
sns_extended_client.publish(
TopicArn=demo_topic_arn, Message="This message should be published to S3"
TopicArn=sns_topic_arn, Message="This message should be published to S3"
)
print("\n\nPublished using SNS extended client:")
fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3
Expand All @@ -281,19 +327,19 @@ print("\nUsing decreased message size threshold:")
sns_extended_client.always_through_s3 = False
sns_extended_client.message_size_threshold = 32
sns_extended_client.publish(
TopicArn=demo_topic_arn,
TopicArn=sns_topic_arn,
Message="This message should be published to S3 as it exceeds the limit of the 32 bytes",
)

fetch_and_print_from_sqs(sqs, demo_queue_url) # Prints message stored in s3


# Below is the example to publish message using the SNS.Topic resource
# # Below is the example to publish message using the SNS.Topic resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
"sns", region_name="us-east-1"
)

topic = sns_extended_client_resource.Topic(demo_topic_arn)
topic = sns_extended_client_resource.Topic(sns_topic_arn)
topic.large_payload_support = s3_extended_payload_bucket
topic.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
Expand Down