Skip to content

Commit cbd383b

Browse files
committed
Send discovery message periodically
1 parent b88afe7 commit cbd383b

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

msgflo/msgflo.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,14 @@ def nack(self, msg):
6363
self._engine.nack_message(msg)
6464

6565

66+
DEFAULT_DISCOVERY_PERIOD=60
67+
6668
# Interface for engine/transport implementations
6769
class Engine(object):
68-
def __init__(self, broker):
70+
def __init__(self, broker, discovery_period=None):
6971
self.broker_url = broker
7072
self.broker_info = urlparse.urlparse(self.broker_url)
73+
self.discovery_period = discovery_period if discovery_period else DEFAULT_DISCOVERY_PERIOD
7174

7275
def done_callback(self, done_cb):
7376
self._done_cb = done_cb
@@ -91,8 +94,8 @@ def __init__(self, raw):
9194

9295
class AmqpEngine(Engine):
9396

94-
def __init__(self, broker):
95-
Engine.__init__(self, broker)
97+
def __init__(self, broker, discovery_period=None):
98+
Engine.__init__(self, broker, discovery_period=discovery_period)
9699

97100
# Connect to AMQP broker with default connection and authentication
98101
# FIXME: respect self.broker_url
@@ -108,14 +111,19 @@ def add_participant(self, participant):
108111
self.participant = participant
109112
self.participant._engine = self
110113

111-
self._send_discovery(self._channel, self.participant.definition)
112-
113114
# Create and configure message exchange and queue
114115
for p in self.participant.definition['inports']:
115116
self._setup_queue(self.participant, self._channel, 'in', p)
116117
for p in self.participant.definition['outports']:
117118
self._setup_queue(self.participant, self._channel, 'out', p)
118119

120+
def send_discovery():
121+
while self.participant:
122+
self._send_discovery(self._channel, self.participant.definition)
123+
delay = self.discovery_period/2.2
124+
gevent.sleep(delay) # yields
125+
gevent.Greenlet.spawn(send_discovery)
126+
119127
def run(self):
120128
# Start message pump
121129
self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread)
@@ -249,9 +257,7 @@ def _message_pump_greenthread(self):
249257

250258
def _on_connect(self, client, userdata, flags, rc):
251259
print("Connected with result code" + str(rc))
252-
self._send_discovery(self.participant.definition)
253-
254-
260+
255261
# Subscribe to queues for inports
256262
subscriptions = [] # ("topic", QoS)
257263
for port in self.participant.definition['inports']:
@@ -260,6 +266,13 @@ def _on_connect(self, client, userdata, flags, rc):
260266
subscriptions.append((topic, 0))
261267
self._client.subscribe(subscriptions)
262268

269+
def send_discovery():
270+
while self.participant:
271+
delay = self.discovery_period/2.2
272+
self._send_discovery(self.participant.definition)
273+
gevent.sleep(delay) # yields
274+
gevent.Greenlet.spawn(send_discovery)
275+
263276
def _on_subscribe(self, client, userdata, mid, granted_qos):
264277
logging.debug('subscribed %s' % str(mid))
265278

0 commit comments

Comments
 (0)