Skip to content

Commit 66fc770

Browse files
committed
Merge pull request #69 in CATS/pyclowder2 from feature/CATS-799-fire-extractors-registered-with-space to develop
* commit 'e7f8caf15e25901ccc0df28cb1e34852c9436141': send heartbeat monitor heartbeats remove extractor_id default rabbitmq_queue to be extractor_info.name make sure there is a queue name monitor now groups by extractor_info['name']/extractor_info['version'] only remove file in case of exception update extractor message counts every 60 seconds Move logic from StandardError to Exception python3 docker containers
2 parents b77a282 + e7f8caf commit 66fc770

File tree

11 files changed

+297
-37
lines changed

11 files changed

+297
-37
lines changed

contrib/monitor/Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM python:3.5
2+
3+
ENV RABBITMQ_URI="amqp://guest:guest@rabbitmq/%2F"
4+
EXPOSE 9999
5+
6+
RUN pip install pika==0.11.2 requests==2.18.4
7+
8+
WORKDIR /src
9+
COPY monitor.py /src/
10+
11+
CMD python monitor.py

contrib/monitor/monitor.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#!/usr/bin/env python
2+
3+
import datetime
4+
import http.server
5+
import json
6+
import logging
7+
import os
8+
import threading
9+
import time
10+
import urllib.parse
11+
12+
import pika
13+
import requests
14+
15+
rabbitmq_uri = os.getenv('RABBITMQ_URI', 'amqp://guest:guest@localhost/%2F')
16+
rabbitmq_mgmt_port = os.getenv('RABBITMQ_MGMT_PORT', '15672')
17+
rabbitmq_mgmt_url = ''
18+
19+
extractors = {}
20+
21+
update_frequency = 60
22+
23+
hostName = ""
24+
hostPort = 9999
25+
26+
27+
# ----------------------------------------------------------------------
28+
# WEB SERVER
29+
# ----------------------------------------------------------------------
30+
class MyServer(http.server.BaseHTTPRequestHandler):
31+
def do_GET(self):
32+
self.send_response(200)
33+
self.send_header('Content-type', 'application/json')
34+
self.end_headers()
35+
self.wfile.write(bytes(json.dumps(extractors), 'utf-8'))
36+
37+
38+
def http_server():
39+
server = http.server.HTTPServer((hostName, hostPort), MyServer)
40+
try:
41+
server.serve_forever()
42+
finally:
43+
server.server_close()
44+
45+
46+
# ----------------------------------------------------------------------
47+
# MESSAGES IN QUEUES
48+
# ----------------------------------------------------------------------
49+
def get_mgmt_queue_messages(queue):
50+
global rabbitmq_username, rabbitmq_password
51+
try:
52+
response = requests.get(rabbitmq_mgmt_url + queue, auth=(rabbitmq_username, rabbitmq_password), timeout=5)
53+
response.raise_for_status()
54+
return response.json()['messages']
55+
except:
56+
logging.exception("Error getting list of messages in %s" % queue)
57+
return 0
58+
59+
60+
def update_counts():
61+
global extractors, update_frequency
62+
63+
while True:
64+
for versions in extractors.values():
65+
for extractor in versions.values():
66+
# use management api to get counts
67+
old_waiting = get_mgmt_queue_messages(extractor['queue'])
68+
new_waiting = get_mgmt_queue_messages('extractors.' + extractor['queue'])
69+
errors = get_mgmt_queue_messages('error.' + extractor['queue'])
70+
71+
extractor['messages'] = {
72+
'queues': {
73+
'total': old_waiting + new_waiting,
74+
'direct': new_waiting,
75+
'topic': old_waiting
76+
},
77+
'error': errors
78+
}
79+
80+
time.sleep(update_frequency)
81+
82+
83+
# ----------------------------------------------------------------------
84+
# EXTRACTOR HEARTBEATS
85+
# ----------------------------------------------------------------------
86+
def callback(ch, method, properties, body):
87+
global extractors
88+
89+
data = json.loads(body.decode('utf-8'))
90+
data['updated'] = datetime.datetime.now().isoformat()
91+
if 'id' not in data and 'extractor_info' not in data and 'queue' not in data:
92+
logging.error("missing fields in json : %r " % body)
93+
return
94+
95+
extractor_info = data['extractor_info']
96+
97+
if extractor_info['name'] not in extractors:
98+
extractors[extractor_info['name']] = {}
99+
100+
if extractor_info['version'] not in extractors[extractor_info['name']]:
101+
extractors[extractor_info['name']][extractor_info['version']] = {
102+
'extractor_info': extractor_info,
103+
'queue': data['queue'],
104+
'extractors': {}
105+
}
106+
extractor = extractors[extractor_info['name']][extractor_info['version']]
107+
108+
extractor['extractors'][data['id']] = {
109+
'last_seen': datetime.datetime.now().isoformat(),
110+
}
111+
112+
if extractor['queue'] != data['queue']:
113+
logging.error("mismatched queue names %s != %s." % (data['queue'], extractor['queue']))
114+
extractor['queue'] = data['queue']
115+
116+
117+
def extractors_monitor():
118+
global rabbitmq_mgmt_url, rabbitmq_mgmt_port, rabbitmq_username, rabbitmq_password
119+
120+
params = pika.URLParameters(rabbitmq_uri)
121+
connection = pika.BlockingConnection(params)
122+
123+
# create management url
124+
rabbitmq_url = ''
125+
if rabbitmq_mgmt_port != '':
126+
if params.ssl:
127+
rabbitmq_mgmt_url = 'https://'
128+
else:
129+
rabbitmq_mgmt_url = 'http://'
130+
rabbitmq_mgmt_url = "%s%s:%s/api/queues/%s/" % (rabbitmq_mgmt_url, params.host, rabbitmq_mgmt_port,
131+
urllib.parse.quote_plus(params.virtual_host))
132+
rabbitmq_username = params.credentials.username
133+
rabbitmq_password = params.credentials.password
134+
135+
# connect to channel
136+
channel = connection.channel()
137+
138+
# create extractors exchange for fanout
139+
channel.exchange_declare(exchange='extractors', exchange_type='fanout', durable=True)
140+
141+
# create anonymous queue
142+
result = channel.queue_declare(exclusive=True)
143+
channel.queue_bind(exchange='extractors', queue=result.method.queue)
144+
145+
# listen for messages
146+
channel.basic_consume(callback, queue=result.method.queue, no_ack=True)
147+
148+
channel.start_consuming()
149+
150+
151+
# ----------------------------------------------------------------------
152+
# MAIN
153+
# ----------------------------------------------------------------------
154+
if __name__ == "__main__":
155+
logging.basicConfig(format='%(asctime)-15s [%(threadName)-15s] %(levelname)-7s :'
156+
' %(name)s - %(message)s',
157+
level=logging.INFO)
158+
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARN)
159+
160+
thread = threading.Thread(target=http_server)
161+
thread.setDaemon(True)
162+
thread.start()
163+
164+
thread = threading.Thread(target=update_counts)
165+
thread.setDaemon(True)
166+
thread.start()
167+
168+
extractors_monitor()

docker.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,16 @@ ${DEBUG} docker build --tag clowder/extractors-binary-preview:onbuild sample-ext
1313
${DEBUG} docker build --tag clowder/extractors-simple-extractor:onbuild sample-extractors/simple-extractor
1414
${DEBUG} docker build --tag clowder/extractors-simple-r-extractor:onbuild sample-extractors/simple-r-extractor
1515

16+
# build docker container based on python 3
17+
${DEBUG} docker build --build-arg PYTHON_VERSION=3 --tag clowder/pyclowder-python3:latest .
18+
${DEBUG} docker build --build-arg PYTHON_VERSION=3 --tag clowder/pyclowder-python3:onbuild --file Dockerfile.onbuild .
19+
${DEBUG} docker build --build-arg PYTHON_VERSION=3 --tag clowder/extractors-simple-extractor-python3:onbuild sample-extractors/simple-extractor
20+
${DEBUG} docker build --build-arg PYTHON_VERSION=3 --tag clowder/extractors-simple-r-extractor-python3:onbuild sample-extractors/simple-r-extractor
21+
1622
# build sample extractors
1723
${DEBUG} docker build --tag clowder/extractors-wordcount:latest sample-extractors/wordcount
1824
${DEBUG} docker build --tag clowder/extractors-wordcount-simple-extractor:latest sample-extractors/wordcount-simple-extractor
1925
${DEBUG} docker build --tag clowder/extractors-wordcount-simple-r-extractor:latest sample-extractors/wordcount-simple-r-extractor
26+
27+
# build contrib
28+
${DEBUG} docker build --tag clowder/extractors-monitor:latest contrib/monitor

pyclowder/collections.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def create_empty(connector, host, key, collectionname, description, parentid=Non
2727
logger = logging.getLogger(__name__)
2828

2929
if parentid:
30-
if (spaceid):
30+
if spaceid:
3131
url = '%sapi/collections/newCollectionWithParent?key=%s' % (host, key)
3232
result = requests.post(url, headers={"Content-Type": "application/json"},
3333
data=json.dumps({"name": collectionname, "description": description,
@@ -40,7 +40,7 @@ def create_empty(connector, host, key, collectionname, description, parentid=Non
4040
"parentId": [parentid]}),
4141
verify=connector.ssl_verify if connector else True)
4242
else:
43-
if (spaceid):
43+
if spaceid:
4444
url = '%sapi/collections?key=%s' % (host, key)
4545
result = requests.post(url, headers={"Content-Type": "application/json"},
4646
data=json.dumps({"name": collectionname, "description": description,

0 commit comments

Comments
 (0)