-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathclient.py
More file actions
154 lines (123 loc) · 4.98 KB
/
client.py
File metadata and controls
154 lines (123 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -
import uuid
import zmq
from zmq.utils.jsonapi import jsonmod as json
from zmq.eventloop.zmqstream import ZMQStream
from circus.exc import CallError
from circus.py3compat import string_types, b
from circus.util import get_connection
from circus.client import CircusClient, make_message
from collections import defaultdict
from datetime import timedelta
from tornado import gen
class AsynchronousCircusClient(CircusClient):
"""
An asynchronous circus client implementation designed to works with tornado
IOLoop
"""
def __init__(self, loop, endpoint, context=None, timeout=5.0,
ssh_server=None, ssh_keyfile=None):
self.context = context or zmq.Context.instance()
self.ssh_server = ssh_server
self.ssh_keyfile = ssh_keyfile
self._timeout = timeout
self.timeout = timeout * 1000
self.loop = loop
self.endpoint = endpoint
# Infos
self.stats_endpoint = None
self.pubsub_endpoint = None
self.check_delay = None
self.connected = False
self.watchers = []
self.plugins = []
self.stats = defaultdict(list)
self.dstats = []
self.sockets = None
self.use_sockets = False
self.embed_httpd = False
# Connection counter
self.count = 0
def send_message(self, command, callback=None, **props):
return self.call(make_message(command, **props), callback)
def call(self, cmd, callback):
if not isinstance(cmd, string_types):
try:
cmd = json.dumps(cmd)
except ValueError as e:
raise CallError(str(e))
socket = self.context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, b(uuid.uuid4().hex))
socket.setsockopt(zmq.LINGER, 0)
get_connection(socket, self.endpoint, self.ssh_server,
self.ssh_keyfile)
if callback:
stream = ZMQStream(socket, self.loop)
def timeout_callback():
stream.stop_on_recv()
stream.close()
raise CallError('Call timeout for cmd', cmd)
timeout = self.loop.add_timeout(timedelta(seconds=5),
timeout_callback)
def recv_callback(msg):
self.loop.remove_timeout(timeout)
stream.stop_on_recv()
stream.close()
callback(json.loads(msg[0]))
stream.on_recv(recv_callback)
try:
socket.send(b(cmd))
except zmq.ZMQError as e:
raise CallError(str(e))
if not callback:
return json.loads(socket.recv())
@gen.coroutine
def update_watchers(self):
"""Calls circus and initialize the list of watchers.
If circus is not connected raises an error.
"""
self.watchers = []
self.plugins = []
# trying to list the watchers
try:
self.connected = True
watchers = yield gen.Task(self.send_message, 'list')
watchers = watchers['watchers']
for watcher in watchers:
if watcher in ('circusd-stats', 'circushttpd'):
if watcher == 'circushttpd':
self.embed_httpd = True
continue
options = yield gen.Task(self.send_message, 'options',
name=watcher)
options = options['options']
self.watchers.append((watcher, options))
if watcher.startswith('plugin:'):
self.plugins.append(watcher)
if not self.use_sockets and options.get('use_sockets', False):
self.use_sockets = True
self.watchers.sort()
global_options = yield gen.Task(self.get_global_options)
self.check_delay = global_options['check_delay']
# Stats endpoints
self.stats_endpoint = global_options['stats_endpoint']
if self.endpoint.startswith('tcp://'):
# In case of multi interface binding i.e: tcp://0.0.0.0:5557
anyaddr = '0.0.0.0'
ip = self.endpoint.lstrip('tcp://').split(':')[0]
self.stats_endpoint = self.stats_endpoint.replace(anyaddr, ip)
# Pub Sub endpoints
self.pubsub_endpoint = global_options['pubsub_endpoint']
if self.endpoint.startswith('tcp://'):
# In case of multi interface binding i.e: tcp://0.0.0.0:5557
anyaddr = '0.0.0.0'
ip = self.endpoint.lstrip('tcp://').split(':')[0]
self.pubsub_endpoint = self.pubsub_endpoint.replace(
anyaddr, ip)
except CallError:
self.connected = False
raise
@gen.coroutine
def get_global_options(self):
res = yield gen.Task(self.send_message, 'globaloptions')
raise gen.Return(res['options'])