Skip to content

Commit f8a0871

Browse files
authored
Merge pull request #11 from msgflo/periodic-discovery
Periodic discovery messages
2 parents 5d3273d + cbd383b commit f8a0871

File tree

4 files changed

+26
-12
lines changed

4 files changed

+26
-12
lines changed

bin/msgflo-python

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#!/usr/bin/env python
1+
#!/usr/bin/env python2
22

33
import sys, os, importlib
44
sys.path.append(os.path.abspath(".."))

examples/repeat.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#!/usr/bin/env python
1+
#!/usr/bin/env python2
22

33
import sys, os, json, logging
44
sys.path.append(os.path.abspath("."))

msgflo/msgflo.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,14 @@ def nack(self, msg):
6363
self._engine.nack_message(msg)
6464

6565

66+
DEFAULT_DISCOVERY_PERIOD=60
67+
6668
# Interface for engine/transport implementations
6769
class Engine(object):
68-
def __init__(self, broker):
70+
def __init__(self, broker, discovery_period=None):
6971
self.broker_url = broker
7072
self.broker_info = urlparse.urlparse(self.broker_url)
73+
self.discovery_period = discovery_period if discovery_period else DEFAULT_DISCOVERY_PERIOD
7174

7275
def done_callback(self, done_cb):
7376
self._done_cb = done_cb
@@ -91,8 +94,8 @@ def __init__(self, raw):
9194

9295
class AmqpEngine(Engine):
9396

94-
def __init__(self, broker):
95-
Engine.__init__(self, broker)
97+
def __init__(self, broker, discovery_period=None):
98+
Engine.__init__(self, broker, discovery_period=discovery_period)
9699

97100
# Connect to AMQP broker with default connection and authentication
98101
# FIXME: respect self.broker_url
@@ -108,14 +111,19 @@ def add_participant(self, participant):
108111
self.participant = participant
109112
self.participant._engine = self
110113

111-
self._send_discovery(self._channel, self.participant.definition)
112-
113114
# Create and configure message exchange and queue
114115
for p in self.participant.definition['inports']:
115116
self._setup_queue(self.participant, self._channel, 'in', p)
116117
for p in self.participant.definition['outports']:
117118
self._setup_queue(self.participant, self._channel, 'out', p)
118119

120+
def send_discovery():
121+
while self.participant:
122+
self._send_discovery(self._channel, self.participant.definition)
123+
delay = self.discovery_period/2.2
124+
gevent.sleep(delay) # yields
125+
gevent.Greenlet.spawn(send_discovery)
126+
119127
def run(self):
120128
# Start message pump
121129
self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread)
@@ -249,9 +257,7 @@ def _message_pump_greenthread(self):
249257

250258
def _on_connect(self, client, userdata, flags, rc):
251259
print("Connected with result code" + str(rc))
252-
self._send_discovery(self.participant.definition)
253-
254-
260+
255261
# Subscribe to queues for inports
256262
subscriptions = [] # ("topic", QoS)
257263
for port in self.participant.definition['inports']:
@@ -260,6 +266,13 @@ def _on_connect(self, client, userdata, flags, rc):
260266
subscriptions.append((topic, 0))
261267
self._client.subscribe(subscriptions)
262268

269+
def send_discovery():
270+
while self.participant:
271+
delay = self.discovery_period/2.2
272+
self._send_discovery(self.participant.definition)
273+
gevent.sleep(delay) # yields
274+
gevent.Greenlet.spawn(send_discovery)
275+
263276
def _on_subscribe(self, client, userdata, mid, granted_qos):
264277
logging.debug('subscribed %s' % str(mid))
265278

@@ -314,7 +327,7 @@ def run(participant, broker=None, done_cb=None):
314327
def main(Participant, role=None):
315328
if not role:
316329
try:
317-
role = sys.argv[0]
330+
role = sys.argv[1]
318331
except IndexError, e:
319332
role = participant.definition.component.tolower()
320333

spec/participant.coffee

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ chai = require 'chai' unless chai
44
heterogenous = require '../node_modules/msgflo/spec/heterogenous.coffee'
55

66
python = process.env.PYTHON or 'python'
7+
repeatPy = path.join __dirname, '..', 'examples', 'repeat.py'
78
participants =
8-
'PythonRepeat': [python, path.join __dirname, '..', 'examples', 'repeat.py']
9+
'PythonRepeat': [python, repeatPy, 'repeater']
910

1011
# Note: most require running an external broker service
1112
transports =

0 commit comments

Comments
 (0)