33import sys , os , json , random , urlparse
44sys .path .append (os .path .abspath ("." ))
55
6+ from optparse import OptionParser
7+
68import logging
79logging .basicConfig ()
810log_level = os .environ .get ('MSGFLO_PYTHON_LOGLEVEL' )
@@ -75,7 +77,7 @@ def __init__(self, broker, discovery_period=None):
7577 def done_callback (self , done_cb ):
7678 self ._done_cb = done_cb
7779
78- def add_participant (self , participant ):
80+ def add_participant (self , participant , iips = {} ):
7981 raise NotImplementedError
8082
8183 def ack_message (self , msg ):
@@ -92,6 +94,13 @@ def __init__(self, raw):
9294 self .data = raw
9395 self .json = None
9496
97+ def deliver_iips (participant ):
98+ iips = participant ._iips
99+ for port , data in iips .items ():
100+ msg = Message (data )
101+ msg .json = data
102+ participant .process (port , msg )
103+
95104class AmqpEngine (Engine ):
96105
97106 def __init__ (self , broker , discovery_period = None ):
@@ -107,16 +116,21 @@ def __init__(self, broker, discovery_period=None):
107116 self ._channel = self ._conn .channel ()
108117 self ._channel .add_close_listener (self ._channel_closed_cb )
109118
110- def add_participant (self , participant ):
119+ def add_participant (self , participant , iips = {} ):
111120 self .participant = participant
112121 self .participant ._engine = self
122+ self .participant ._iips = iips
113123
114124 # Create and configure message exchange and queue
115125 for p in self .participant .definition ['inports' ]:
116126 self ._setup_queue (self .participant , self ._channel , 'in' , p )
117127 for p in self .participant .definition ['outports' ]:
118128 self ._setup_queue (self .participant , self ._channel , 'out' , p )
119129
130+ # Deliver IIPs
131+ deliver_iips (self .participant )
132+
133+ # Send discovery message
120134 def send_discovery ():
121135 while self .participant :
122136 self ._send_discovery (self ._channel , self .participant .definition )
@@ -222,9 +236,10 @@ def __init__(self, broker):
222236 port = 1883
223237 self ._client .connect (host , port , 60 )
224238
225- def add_participant (self , participant ):
239+ def add_participant (self , participant , iips = {} ):
226240 self .participant = participant
227241 self .participant ._engine = self
242+ self .participant ._iips = iips
228243
229244 def run (self ):
230245 self ._message_pump_greenlet = gevent .spawn (self ._message_pump_greenthread )
@@ -266,6 +281,10 @@ def _on_connect(self, client, userdata, flags, rc):
266281 subscriptions .append ((topic , 0 ))
267282 self ._client .subscribe (subscriptions )
268283
284+ # Deliver IIPs
285+ deliver_iips (self .participant )
286+
287+ # Send discovery messsage
269288 def send_discovery ():
270289 while self .participant :
271290 delay = self .discovery_period / 2.2
@@ -304,7 +323,7 @@ def _send_discovery(self, definition):
304323 logger .debug ('sent discovery message %s' % msg )
305324 return
306325
307- def run (participant , broker = None , done_cb = None ):
326+ def run (participant , broker = None , done_cb = None , iips = {} ):
308327 if broker is None :
309328 broker = os .environ .get ('MSGFLO_BROKER' , 'amqp://localhost' )
310329
@@ -319,22 +338,36 @@ def run(participant, broker=None, done_cb=None):
319338
320339 if done_cb :
321340 engine .done_callback (done_cb )
322- engine .add_participant (participant )
341+ engine .add_participant (participant , iips )
323342 engine .run ()
324343
325344 return engine
326345
346+ def parse (argv , defaults = {}):
347+ parser = OptionParser (usage = "%prog [options] role" )
348+ parser .add_option ("-i" , "--iips" , dest = "iips" , default = '{}' ,
349+ help = "Data as initial information packets" , metavar = '{"inportA": "inA-data", ...}' )
350+
351+ (options , args ) = parser .parse_args (argv )
352+ for k , v in defaults .items ():
353+ if k not in options :
354+ options [k ] = v
355+ options .iips = json .loads (options .iips )
356+ options .role = args [0 ]
357+
358+ return [options , parser ]
359+
327360def main (Participant , role = None ):
328- if not role :
329- try :
330- role = sys . argv [ 1 ]
331- except IndexError , e :
332- role = participant . definition . component . tolower ( )
361+ [ config , parser ] = parse ( sys . argv )
362+ if role :
363+ config . role = role
364+ if not config . role :
365+ parser . error ( " role not specified" )
333366
334- participant = Participant (role )
367+ participant = Participant (config . role )
335368 d = participant .definition
336369 waiter = gevent .event .AsyncResult ()
337- engine = run (participant , done_cb = waiter .set )
370+ engine = run (participant , done_cb = waiter .set , iips = config . iips )
338371 anon_url = "%s://%s" % (engine .broker_info .scheme , engine .broker_info .hostname )
339372 print "%s(%s) running on %s" % (d ['role' ], d ['component' ], anon_url )
340373 sys .stdout .flush ()
0 commit comments