Skip to content

Commit fb1cba0

Browse files
author
Greg Poirier
committed
Add SQSServiceSensor for non-polling SQS sensor
This adds a SQS Sensor with its own polling loop so that we can consume messages from one or more SQS queues as quickly as possible without relying on StackStorm to trigger a poll interval.
1 parent 777aa4a commit fb1cba0

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

sensors/sqs_service_sensor.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
"""
2+
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
3+
After receiving a message it's content is passed as payload to a trigger 'aws.sqs_new_message'
4+
This sensor can be configured either by using config.yaml within a pack or by creating
5+
following values in datastore:
6+
- aws.input_queues (list queues as comma separated string: first_queue,second_queue)
7+
- aws.aws_access_key_id
8+
- aws.aws_secret_access_key
9+
- aws.region
10+
- aws.max_number_of_messages (must be between 1 - 10)
11+
For configuration in config.yaml with config like this
12+
setup:
13+
aws_access_key_id:
14+
aws_access_key_id:
15+
region:
16+
sqs_sensor:
17+
input_queues:
18+
- first_queue
19+
- second_queue
20+
sqs_other:
21+
max_number_of_messages: 1
22+
If any value exist in datastore it will be taken instead of any value in config.yaml
23+
"""
24+
25+
import six
26+
import json
27+
from boto3.session import Session
28+
from botocore.exceptions import ClientError
29+
from botocore.exceptions import NoRegionError
30+
from botocore.exceptions import NoCredentialsError
31+
from botocore.exceptions import EndpointConnectionError
32+
33+
from st2reactor.sensor.base import PollingSensor
34+
35+
36+
class AWSSQSServiceSensor(Sensor):
37+
def __init__(self, sensor_service, config=None):
38+
super(AWSSQSServiceSensor, self).__init__(sensor_service=sensor_service, config=config)
39+
40+
def setup(self):
41+
self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
42+
43+
self.session = None
44+
self.sqs_res = None
45+
46+
def run(self):
47+
# setting SQS ServiceResource object from the parameter of datastore or configuration file
48+
self._may_setup_sqs()
49+
50+
while True:
51+
for queue in self.input_queues:
52+
msgs = self._receive_messages(queue=self._get_queue_by_name(queue),
53+
num_messages=self.max_number_of_messages)
54+
for msg in msgs:
55+
if msg:
56+
payload = {"queue": queue, "body": json.loads(msg.body)}
57+
self._sensor_service.dispatch(trigger="aws.sqs_new_message", payload=payload)
58+
msg.delete()
59+
60+
def cleanup(self):
61+
pass
62+
63+
def add_trigger(self, trigger):
64+
# This method is called when trigger is created
65+
pass
66+
67+
def update_trigger(self, trigger):
68+
# This method is called when trigger is updated
69+
pass
70+
71+
def remove_trigger(self, trigger):
72+
pass
73+
74+
def _get_config_entry(self, key, prefix=None):
75+
''' Get configuration values either from Datastore or config file. '''
76+
config = self.config
77+
if prefix:
78+
config = self._config.get(prefix, {})
79+
80+
value = self._sensor_service.get_value('aws.%s' % (key), local=False)
81+
if not value:
82+
value = config.get(key, None)
83+
84+
if not value and config.get('setup', None):
85+
value = config['setup'].get(key, None)
86+
87+
return value
88+
89+
def _may_setup_sqs(self):
90+
queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor')
91+
92+
# XXX: This is a hack as from datastore we can only receive a string while
93+
# from config.yaml we can receive a list
94+
if isinstance(queues, six.string_types):
95+
self.input_queues = [x.strip() for x in queues.split(',')]
96+
elif isinstance(queues, list):
97+
self.input_queues = queues
98+
else:
99+
self.input_queues = []
100+
101+
self.aws_access_key = self._get_config_entry('aws_access_key_id')
102+
self.aws_secret_key = self._get_config_entry('aws_secret_access_key')
103+
self.aws_region = self._get_config_entry('region')
104+
105+
self.max_number_of_messages = self._get_config_entry('max_number_of_messages',
106+
prefix='sqs_other')
107+
108+
# checker configuration is update, or not
109+
def _is_same_credentials():
110+
c = self.session.get_credentials()
111+
return c is not None and \
112+
c.access_key == self.aws_access_key and \
113+
c.secret_key == self.aws_secret_key and \
114+
self.session.region_name == self.aws_region
115+
116+
if self.session is None or not _is_same_credentials():
117+
self._setup_sqs()
118+
119+
def _setup_sqs(self):
120+
''' Setup Boto3 structures '''
121+
self._logger.debug('Setting up SQS resources')
122+
self.session = Session(aws_access_key_id=self.aws_access_key,
123+
aws_secret_access_key=self.aws_secret_key,
124+
region_name=self.aws_region)
125+
126+
try:
127+
self.sqs_res = self.session.resource('sqs')
128+
except NoRegionError:
129+
self._logger.warning("The specified region '%s' is invalid", self.aws_region)
130+
131+
def _get_queue_by_name(self, queueName):
132+
''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
133+
try:
134+
return self.sqs_res.get_queue_by_name(QueueName=queueName)
135+
except ClientError as e:
136+
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
137+
self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
138+
return self.sqs_res.create_queue(QueueName=queueName)
139+
elif e.response['Error']['Code'] == 'InvalidClientTokenId':
140+
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
141+
else:
142+
raise
143+
except NoCredentialsError as e:
144+
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
145+
except EndpointConnectionError as e:
146+
self._logger.warning(e)
147+
148+
def _receive_messages(self, queue, num_messages, wait_time=2):
149+
''' Receive a message from queue and return it. '''
150+
if queue:
151+
return queue.receive_messages(WaitTimeSeconds=wait_time,
152+
MaxNumberOfMessages=num_messages)
153+
else:
154+
return []

sensors/sqs_service_sensor.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
class_name: "AWSSQSServiceSensor"
3+
entry_point: "sqs_service_sensor.py"
4+
description: "Service Sensor which monitors a SQS queue for new messages"
5+
trigger_types:
6+
-
7+
name: "sqs_new_message"
8+
description: "Trigger which indicates that a new message has arrived"
9+
payload_schema:
10+
type: "object"
11+
properties:
12+
queue:
13+
type: "string"
14+
body:
15+
type: "object"

0 commit comments

Comments
 (0)