Skip to content

Commit 1f8b25b

Browse files
authored
Merge branch 'develop' into clean_extractors_tmpfiles
2 parents 26a4879 + e945148 commit 1f8b25b

File tree

5 files changed

+290
-1
lines changed

5 files changed

+290
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
77
## Unreleased
88

99
### Added
10-
- script to clean extractors' tmp files.
10+
- Script to clean extractors' tmp files.
11+
- Script for RabbitMQ error queue cleanup.
1112

1213
## 1.10.1 - 2020-07-16
1314

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
FROM python:3.5
2+
3+
# EXAMPLE USAGE:
4+
# docker run --rm --net=host \
5+
# -e EXTRACTOR_QUEUE=ncsa.wordcount \
6+
# -e CLOWDER_HOST=http://host.docker.internal:9000 -e CLOWDER_KEY=r1ek3rs \
7+
# -e RABBITMQ_URI="amqp://guest:[email protected]:5672/%2f" \
8+
# clowder/rmq-error-shovel
9+
10+
# environemnt variables
11+
ENV EXTRACTOR_QUEUE="ncsa.image.preview" \
12+
CLOWDER_HOST="" \
13+
CLOWDER_KEY="" \
14+
RABBITMQ_URI="amqp://guest:guest@rabbitmq/%2F" \
15+
RABBITMQ_MGMT_PORT=15672 \
16+
RABBITMQ_MGMT_PATH="/" \
17+
RABBITMQ_MGMT_URL=""
18+
19+
WORKDIR /src
20+
21+
COPY requirements.txt /src/
22+
RUN pip3 install -r /src/requirements.txt
23+
24+
COPY . /src/
25+
26+
CMD python check_rabbitmq.py

scripts/rmq-error-shovel/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Error queue cleanup up script
2+
=============================
3+
4+
Script to remove files from an error queue if a message refers to a file that doesn't exist anymore. Otherwise moves the
5+
message to the appropriate queue for reprocessing.
6+
7+
This should be executed when needed and not on a timer in case there are other types of error that would result in an
8+
endless loop of messages going from execution queue to error queue and back to execution queue.
9+
10+
To build the container run `docker build -t clowder/rmq-error-shovel .`
11+
12+
To run the container on the a specific queue, for example `ncsa.image.preview` run this command (use your api key and
13+
change other parameters as needed):
14+
15+
```
16+
docker run -t --rm --net=host -e EXTRACTOR_QUEUE=ncsa.image.preview -e CLOWDER_HOST=http://host.docker.internal:9000
17+
-e CLOWDER_KEY=your_api_key -e RABBITMQ_URI="amqp://guest:[email protected]:5672/%2f"
18+
clowder/rmq-error-shovel
19+
```
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
#!/usr/bin/env python3
2+
3+
""" RabbitMQ Error Shovel Service
4+
5+
Given a particular RabbitMQ instance and extractor name, this will check the error queue of that extractor for messages.
6+
- If the dataset or file in the message exists, send the message back to the main queue.
7+
- If it does not exist, generate a log entry and delete the message from the error queue permanently.
8+
9+
If the CLOWDER_HOST and CLOWDER_KEY environment variables are set, they will be used to check the existence of
10+
files and datasets instead of the host and key found in the RabbitMQ message.
11+
"""
12+
import os
13+
import logging
14+
import json
15+
import threading
16+
import requests
17+
import pika
18+
19+
20+
# Simplified version of pyClowder's RabbitMQConnector class.
21+
class RabbitMQMonitor():
22+
def __init__(self, rabbitmq_uri, extractor_name, clowder_host, clowder_key):
23+
self.connection = None
24+
self.channel = None
25+
26+
self.rabbitmq_uri = rabbitmq_uri
27+
self.queue_basic = extractor_name
28+
self.queue_error = "error."+extractor_name
29+
self.clowder_host = clowder_host
30+
self.clowder_key = clowder_key
31+
32+
self.worker = None
33+
self.finished = False
34+
self.lock = threading.Lock()
35+
self.messages = []
36+
37+
def connect(self):
38+
parameters = pika.URLParameters(self.rabbitmq_uri)
39+
self.connection = pika.BlockingConnection(parameters)
40+
self.channel = self.connection.channel()
41+
42+
def listen(self):
43+
if not self.channel:
44+
self.connect()
45+
46+
self.listener = self.channel.basic_consume(queue=self.queue_error,
47+
on_message_callback=self.on_message,
48+
auto_ack=False)
49+
50+
logging.getLogger(__name__).info("Starting to listen for error messages on %s" % self.queue_error)
51+
try:
52+
# pylint: disable=protected-access
53+
while self.channel and self.channel.is_open and self.channel._consumer_infos:
54+
self.channel.connection.process_data_events(time_limit=1) # 1 second
55+
if self.worker:
56+
self.process_messages(self.channel)
57+
if self.is_finished():
58+
self.worker = None
59+
except SystemExit:
60+
raise
61+
except KeyboardInterrupt:
62+
raise
63+
except GeneratorExit:
64+
raise
65+
except Exception: # pylint: disable=broad-except
66+
logging.getLogger(__name__).exception("Error while consuming error messages.")
67+
finally:
68+
logging.getLogger(__name__).info("Stopped listening for error messages.")
69+
if self.channel and self.channel.is_open:
70+
try:
71+
self.channel.close()
72+
except Exception:
73+
logging.getLogger(__name__).exception("Error while closing channel.")
74+
self.channel = None
75+
if self.connection and self.connection.is_open:
76+
try:
77+
self.connection.close()
78+
except Exception:
79+
logging.getLogger(__name__).exception("Error while closing connection.")
80+
self.connection = None
81+
82+
@staticmethod
83+
def _decode_body(body, codecs=None):
84+
if not codecs:
85+
codecs = ['utf8', 'iso-8859-1']
86+
# see https://stackoverflow.com/a/15918519
87+
for i in codecs:
88+
try:
89+
return body.decode(i)
90+
except UnicodeDecodeError:
91+
pass
92+
raise ValueError("Cannot decode body")
93+
94+
def on_message(self, channel, method, header, body):
95+
"""When the message is received this will check the message target.
96+
Any message will only be acked if the message is processed,
97+
or there is an exception (except for SystemExit and SystemError exceptions).
98+
"""
99+
100+
try:
101+
json_body = json.loads(self._decode_body(body))
102+
if 'routing_key' not in json_body and method.routing_key:
103+
json_body['routing_key'] = method.routing_key
104+
json_body["header"] = header
105+
json_body["method"] = method
106+
107+
self.worker = threading.Thread(target=self.evaluate_message, args=(json_body,))
108+
self.worker.start()
109+
110+
except ValueError:
111+
# something went wrong, move message to error queue and give up on this message immediately
112+
logging.getLogger(__name__).exception("Error processing message.")
113+
# TODO: What to do here?
114+
properties = pika.BasicProperties(delivery_mode=2, reply_to=header.reply_to)
115+
channel.basic_publish(exchange='',
116+
routing_key='error.' + self.extractor_name,
117+
properties=properties,
118+
body=body)
119+
channel.basic_ack(method.delivery_tag)
120+
121+
def process_messages(self, channel):
122+
while self.messages:
123+
with self.lock:
124+
msg = self.messages.pop(0)
125+
method = msg["body"]["method"]
126+
header = msg["body"]["header"]
127+
del msg["body"]["method"]
128+
del msg["body"]["header"]
129+
130+
if msg["type"] == 'missing':
131+
logging.getLogger(__name__).info("%s [%s] removed." % (msg["resource_type"], msg["resource_id"]))
132+
channel.basic_ack(method.delivery_tag)
133+
134+
with self.lock:
135+
self.finished = True
136+
137+
elif msg["type"] == 'resubmit':
138+
properties = pika.BasicProperties(delivery_mode=2, reply_to=header.reply_to)
139+
# Reset retry count to 0
140+
# TODO: If resource exists but retry count > some N, should we stop bouncing it back to main queue?
141+
msg["body"]["retry_count"] = 0
142+
logging.getLogger(__name__).info("%s [%s] resubmitted." % (msg["resource_type"], msg["resource_id"]))
143+
channel.basic_publish(exchange='',
144+
routing_key=self.queue_basic,
145+
properties=properties,
146+
body=json.dumps(msg["body"]))
147+
channel.basic_ack(method.delivery_tag)
148+
with self.lock:
149+
self.finished = True
150+
151+
else:
152+
logging.getLogger(__name__).error("Received unknown message type [%s]." % msg["type"])
153+
154+
def evaluate_message(self, body):
155+
host = body.get('host', '')
156+
if host == '':
157+
return
158+
elif not host.endswith('/'):
159+
host += '/'
160+
key = body.get("secretKey", '')
161+
162+
fileid = body.get('id', '')
163+
datasetid = body.get('datasetId', '')
164+
165+
# determine resource type; defaults to file
166+
resource_type = "file"
167+
message_type = body['routing_key']
168+
if message_type.find(".dataset.") > -1:
169+
resource_type = "dataset"
170+
elif message_type.find(".file.") > -1:
171+
resource_type = "file"
172+
elif message_type.find("metadata.added") > -1:
173+
resource_type = "metadata"
174+
elif message_type == "extractors." + self.queue_basic:
175+
# This was a manually submitted extraction
176+
if datasetid == fileid:
177+
resource_type = "dataset"
178+
else:
179+
resource_type = "file"
180+
181+
if resource_type == "dataset":
182+
resource_id = datasetid
183+
else:
184+
resource_id = fileid
185+
186+
r = self.check_existence(host, key, resource_type, resource_id)
187+
resubmit = False
188+
189+
if r.status_code == 200:
190+
# The erroneous resource exists, so resubmit to main queue
191+
logging.getLogger(__name__).error("%s [%s]: Resubmitting." % (resource_type, resource_id))
192+
resubmit = True
193+
elif r.status_code == 401:
194+
# Unauthorized to view resource, but it exists so resubmit (extractor might use other creds)
195+
logging.getLogger(__name__).error("%s [%s]: Credentials not authorized. Resubmitting." % (resource_type, resource_id))
196+
resubmit = True
197+
else:
198+
logging.getLogger(__name__).error("%s [%s]: %s. Removing." % (resource_type, resource_id, r))
199+
self.messages.append({
200+
"type": "missing",
201+
"resource_type": resource_type,
202+
"resource_id": resource_id,
203+
"body": body
204+
})
205+
206+
if resubmit:
207+
self.messages.append({
208+
"type": "resubmit",
209+
"resource_type": resource_type,
210+
"resource_id": resource_id,
211+
"body": body
212+
})
213+
214+
# Return response of request for the resource from Clowder
215+
def check_existence(self, host, key, resource_type, resource_id):
216+
# Perform replacements from environment variables if needed
217+
host_url = self.clowder_host if self.clowder_host != '' else host
218+
if not host_url.endswith('/'): host_url += '/'
219+
secret_key = self.clowder_key if self.clowder_key != '' else key
220+
# TODO: Is there a better exists URL to use?
221+
clowder_url = "%sapi/%ss/%s/metadata?key=%s" % (host_url, resource_type, resource_id, secret_key)
222+
r = requests.get(clowder_url)
223+
return r
224+
225+
def is_finished(self):
226+
with self.lock:
227+
return self.worker and not self.worker.isAlive() and self.finished and len(self.messages) == 0
228+
229+
230+
if __name__ == "__main__":
231+
logging.basicConfig(format='%(asctime)-15s [%(threadName)-15s] %(levelname)-7s :'
232+
' %(name)s - %(message)s',
233+
level=logging.INFO)
234+
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARN)
235+
236+
rabbitmq_uri = os.getenv('RABBITMQ_URI', 'amqp://guest:guest@localhost:5672/%2f')
237+
extractor_name = os.getenv('EXTRACTOR_QUEUE', 'ncsa.image.preview')
238+
clowder_host = os.getenv('CLOWDER_HOST', '')
239+
clowder_key = os.getenv('CLOWDER_KEY', '')
240+
monitor = RabbitMQMonitor(rabbitmq_uri, extractor_name, clowder_host, clowder_key)
241+
monitor.listen()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pika==1.0.0
2+
requests==2.21.0

0 commit comments

Comments
 (0)