Skip to content

Commit ae5387c

Browse files
committed
add initial version of script (non working)
1 parent c7d83b1 commit ae5387c

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
FROM python:3.5
2+
3+
# docker run --rm --net=host -e EXTRACTOR_QUEUE=ncsa.wordcount -e RABBITMQ_URI="amqp://max:[email protected]:5672/%2f" maxzilla2/rmq-error-shovel
4+
5+
# environemnt variables
6+
ENV EXTRACTOR_QUEUE="ncsa.image.preview" \
7+
RABBITMQ_URI="amqp://guest:guest@rabbitmq/%2F" \
8+
RABBITMQ_MGMT_PORT=15672 \
9+
RABBITMQ_MGMT_PATH="/" \
10+
RABBITMQ_MGMT_URL=""
11+
12+
WORKDIR /src
13+
14+
COPY requirements.txt /src/
15+
RUN pip3 install -r /src/requirements.txt
16+
17+
COPY . /src/
18+
19+
CMD python check_rabbitmq.py
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
#!/usr/bin/env python3
2+
3+
""" RabbitMQ Error Shovel Service
4+
5+
Given a particular RabbitMQ instance and extractor name, this will check the error queue of that extractor for messages.
6+
- If the dataset or file in the message exists, send the message back to the main queue.
7+
- If it does not exist, generate a log entry and delete the message from the error queue permanently.
8+
"""
9+
import os
10+
import logging
11+
import json
12+
import threading
13+
import pika
14+
15+
16+
# Simplified version of pyClowder's RabbitMQConnector class.
17+
class RabbitMQMonitor():
18+
def __init__(self, rabbitmq_uri, extractor_name):
19+
self.connection = None
20+
self.channel = None
21+
self.rabbitmq_uri = rabbitmq_uri
22+
self.queue_basic = extractor_name
23+
self.queue_error = "error."+extractor_name
24+
25+
self.worker = None
26+
self.finished = False
27+
self.lock = threading.Lock()
28+
self.messages = []
29+
30+
def connect(self):
31+
parameters = pika.URLParameters(self.rabbitmq_uri)
32+
self.connection = pika.BlockingConnection(parameters)
33+
self.channel = self.connection.channel()
34+
35+
def listen(self):
36+
if not self.channel:
37+
self.connect()
38+
39+
self.listener = self.channel.basic_consume(queue=self.queue_error,
40+
on_message_callback=self.on_message,
41+
auto_ack=False)
42+
43+
logging.getLogger(__name__).info("Starting to listen for error messages.")
44+
try:
45+
# pylint: disable=protected-access
46+
while self.channel and self.channel.is_open and self.channel._consumer_infos:
47+
self.channel.connection.process_data_events(time_limit=1) # 1 second
48+
if self.worker:
49+
self.process_messages(self.channel)
50+
if self.is_finished():
51+
self.worker = None
52+
except SystemExit:
53+
raise
54+
except KeyboardInterrupt:
55+
raise
56+
except GeneratorExit:
57+
raise
58+
except Exception: # pylint: disable=broad-except
59+
logging.getLogger(__name__).exception("Error while consuming error messages.")
60+
finally:
61+
logging.getLogger(__name__).info("Stopped listening for error messages.")
62+
if self.channel and self.channel.is_open:
63+
try:
64+
self.channel.close()
65+
except Exception:
66+
logging.getLogger(__name__).exception("Error while closing channel.")
67+
self.channel = None
68+
if self.connection and self.connection.is_open:
69+
try:
70+
self.connection.close()
71+
except Exception:
72+
logging.getLogger(__name__).exception("Error while closing connection.")
73+
self.connection = None
74+
75+
@staticmethod
76+
def _decode_body(body, codecs=None):
77+
if not codecs:
78+
codecs = ['utf8', 'iso-8859-1']
79+
# see https://stackoverflow.com/a/15918519
80+
for i in codecs:
81+
try:
82+
return body.decode(i)
83+
except UnicodeDecodeError:
84+
pass
85+
raise ValueError("Cannot decode body")
86+
87+
def on_message(self, channel, method, header, body):
88+
"""When the message is received this will check the message target.
89+
Any message will only be acked if the message is processed,
90+
or there is an exception (except for SystemExit and SystemError exceptions).
91+
"""
92+
93+
try:
94+
json_body = json.loads(self._decode_body(body))
95+
if 'routing_key' not in json_body and method.routing_key:
96+
json_body['routing_key'] = method.routing_key
97+
98+
self.worker = threading.Thread(target=self.evaluate_message, args=(json_body,))
99+
self.worker.start()
100+
101+
except ValueError:
102+
# something went wrong, move message to error queue and give up on this message immediately
103+
logging.getLogger(__name__).exception("Error processing message.")
104+
# TODO: What to do here?
105+
properties = pika.BasicProperties(delivery_mode=2, reply_to=header.reply_to)
106+
channel.basic_publish(exchange='',
107+
routing_key='error.' + self.extractor_name,
108+
properties=properties,
109+
body=body)
110+
channel.basic_ack(method.delivery_tag)
111+
112+
def process_messages(self, channel):
113+
while self.messages:
114+
with self.lock:
115+
msg = self.messages.pop(0)
116+
117+
if msg["type"] == 'missing':
118+
jbody = json.loads(self.body)
119+
logging.error("%s %s no longer exists." % (jbody["resource_type"], jbody["resource_id"]))
120+
channel.basic_ack(self.method.delivery_tag)
121+
with self.lock:
122+
self.finished = True
123+
124+
elif msg["type"] == 'resubmit':
125+
properties = pika.BasicProperties(delivery_mode=2, reply_to=self.header.reply_to)
126+
# Reset retry count to 0
127+
# TODO: If resource exists but retry count > some N, should we stop bouncing it back to main queue?
128+
jbody = json.loads(self.body)
129+
jbody["retry_count"] = 0
130+
logging.error("%s %s goes back to the main queue, precious!" % (jbody["resource_type"], jbody["resource_id"]))
131+
channel.basic_publish(exchange='',
132+
routing_key=self.queue_basic,
133+
properties=properties,
134+
body=json.dumps(jbody))
135+
channel.basic_ack(self.method.delivery_tag)
136+
with self.lock:
137+
self.finished = True
138+
139+
else:
140+
logging.getLogger(__name__).error("Received unknown message type [%s]." % msg["type"])
141+
142+
def evaluate_message(self, jbody):
143+
# TODO: If dataset or file, check existence as necessary.
144+
self.messages.append({
145+
"type": "resubmit",
146+
"resource_type": jbody["type"],
147+
"resource_id": jbody["id"]
148+
})
149+
150+
def is_finished(self):
151+
with self.lock:
152+
return self.worker and not self.worker.isAlive() and self.finished and len(self.messages) == 0
153+
154+
155+
logging.getLogger(__name__).info("Starting to listen")
156+
print("Starting to listen...")
157+
rabbitmq_uri = os.getenv('RABBITMQ_URI', 'amqp://guest:guest@localhost:5672/%2f')
158+
extractor_name = os.getenv('EXTRACTOR_QUEUE', 'ncsa.image.preview')
159+
monitor = RabbitMQMonitor(rabbitmq_uri, extractor_name)
160+
logging.getLogger(__name__).info("Starting to listen to "+extractor_name)
161+
monitor.listen()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pika==1.0.0
2+
requests==2.21.0

0 commit comments

Comments
 (0)