-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdynamodb_to_sqs_lambda.py
More file actions
50 lines (40 loc) · 1.25 KB
/
dynamodb_to_sqs_lambda.py
File metadata and controls
50 lines (40 loc) · 1.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""
Dynamo to SQS
"""
import boto3
import json
DYNAMODB = boto3.resource('dynamodb')
TABLE = "amazon-review"
QUEUE = "dynamo-queue"
SQS = boto3.client("sqs")
def scan_table(table):
"""Scans table and return results"""
producer_table = DYNAMODB.Table(table)
response = producer_table.scan()
items = response['Items']
return items
def send_sqs_msg(msg, queue_name, delay=0):
"""Send SQS Message
Expects an SQS queue_name and msg in a dictionary format.
Returns a response dictionary.
"""
queue_url = SQS.get_queue_url(QueueName=queue_name)["QueueUrl"]
#queue_send_log_msg = "Send message to queue url: %s, with body: %s" %(queue_url, msg)
json_msg = json.dumps(msg)
response = SQS.send_message(
QueueUrl=queue_url,
MessageBody=json_msg,
DelaySeconds=delay)
#queue_send_log_msg_resp = "Message Response: %s for queue url: %s" %(response, queue_url)
return response
def send_emissions(table, queue_name):
"""Send Emissions"""
items = scan_table(table=table)
for item in items:
#print(item)
response = send_sqs_msg(item, queue_name=queue_name)
def lambda_handler(event, context):
"""
Lambda entrypoint
"""
send_emissions(table=TABLE, queue_name=QUEUE)