Skip to content

Commit 11ec37f

Browse files
authored
Merge pull request #20 from msgflo/iip
Support --iips
2 parents fecf77f + 4823335 commit 11ec37f

File tree

2 files changed

+46
-13
lines changed

2 files changed

+46
-13
lines changed

bin/msgflo-python

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def load_module_file(filepath):
2525
def main():
2626
prog, args = sys.argv[0], sys.argv[1:]
2727
try:
28-
modulepath, role = args
28+
modulepath, role = args[0:2]
2929
except ValueError, e:
3030
sys.stderr.write("Usage: msgflo-python MODULE.py ROLE\n")
3131
return 1

msgflo/msgflo.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import sys, os, json, random, urlparse
44
sys.path.append(os.path.abspath("."))
55

6+
from optparse import OptionParser
7+
68
import logging
79
logging.basicConfig()
810
log_level = os.environ.get('MSGFLO_PYTHON_LOGLEVEL')
@@ -75,7 +77,7 @@ def __init__(self, broker, discovery_period=None):
7577
def done_callback(self, done_cb):
7678
self._done_cb = done_cb
7779

78-
def add_participant(self, participant):
80+
def add_participant(self, participant, iips={}):
7981
raise NotImplementedError
8082

8183
def ack_message(self, msg):
@@ -92,6 +94,13 @@ def __init__(self, raw):
9294
self.data = raw
9395
self.json = None
9496

97+
def deliver_iips(participant):
98+
iips = participant._iips
99+
for port, data in iips.items():
100+
msg = Message(data)
101+
msg.json = data
102+
participant.process(port, msg)
103+
95104
class AmqpEngine(Engine):
96105

97106
def __init__(self, broker, discovery_period=None):
@@ -107,16 +116,21 @@ def __init__(self, broker, discovery_period=None):
107116
self._channel = self._conn.channel()
108117
self._channel.add_close_listener(self._channel_closed_cb)
109118

110-
def add_participant(self, participant):
119+
def add_participant(self, participant, iips={}):
111120
self.participant = participant
112121
self.participant._engine = self
122+
self.participant._iips = iips
113123

114124
# Create and configure message exchange and queue
115125
for p in self.participant.definition['inports']:
116126
self._setup_queue(self.participant, self._channel, 'in', p)
117127
for p in self.participant.definition['outports']:
118128
self._setup_queue(self.participant, self._channel, 'out', p)
119129

130+
# Deliver IIPs
131+
deliver_iips(self.participant)
132+
133+
# Send discovery message
120134
def send_discovery():
121135
while self.participant:
122136
self._send_discovery(self._channel, self.participant.definition)
@@ -222,9 +236,10 @@ def __init__(self, broker):
222236
port = 1883
223237
self._client.connect(host, port, 60)
224238

225-
def add_participant(self, participant):
239+
def add_participant(self, participant, iips={}):
226240
self.participant = participant
227241
self.participant._engine = self
242+
self.participant._iips = iips
228243

229244
def run(self):
230245
self._message_pump_greenlet = gevent.spawn(self._message_pump_greenthread)
@@ -266,6 +281,10 @@ def _on_connect(self, client, userdata, flags, rc):
266281
subscriptions.append((topic, 0))
267282
self._client.subscribe(subscriptions)
268283

284+
# Deliver IIPs
285+
deliver_iips(self.participant)
286+
287+
# Send discovery messsage
269288
def send_discovery():
270289
while self.participant:
271290
delay = self.discovery_period/2.2
@@ -304,7 +323,7 @@ def _send_discovery(self, definition):
304323
logger.debug('sent discovery message %s' % msg)
305324
return
306325

307-
def run(participant, broker=None, done_cb=None):
326+
def run(participant, broker=None, done_cb=None, iips={}):
308327
if broker is None:
309328
broker = os.environ.get('MSGFLO_BROKER', 'amqp://localhost')
310329

@@ -319,22 +338,36 @@ def run(participant, broker=None, done_cb=None):
319338

320339
if done_cb:
321340
engine.done_callback(done_cb)
322-
engine.add_participant(participant)
341+
engine.add_participant(participant, iips)
323342
engine.run()
324343

325344
return engine
326345

346+
def parse(argv, defaults={}):
347+
parser = OptionParser(usage="%prog [options] role")
348+
parser.add_option("-i", "--iips", dest="iips", default='{}',
349+
help="Data as initial information packets", metavar='{"inportA": "inA-data", ...}')
350+
351+
(options, args) = parser.parse_args(argv)
352+
for k, v in defaults.items():
353+
if k not in options:
354+
options[k] = v
355+
options.iips = json.loads(options.iips)
356+
options.role = args[0]
357+
358+
return [options, parser]
359+
327360
def main(Participant, role=None):
328-
if not role:
329-
try:
330-
role = sys.argv[1]
331-
except IndexError, e:
332-
role = participant.definition.component.tolower()
361+
[config, parser] = parse(sys.argv)
362+
if role:
363+
config.role = role
364+
if not config.role:
365+
parser.error("role not specified")
333366

334-
participant = Participant(role)
367+
participant = Participant(config.role)
335368
d = participant.definition
336369
waiter = gevent.event.AsyncResult()
337-
engine = run(participant, done_cb=waiter.set)
370+
engine = run(participant, done_cb=waiter.set, iips=config.iips)
338371
anon_url = "%s://%s" % (engine.broker_info.scheme, engine.broker_info.hostname)
339372
print "%s(%s) running on %s" % (d['role'], d['component'], anon_url)
340373
sys.stdout.flush()

0 commit comments

Comments
 (0)