11#!/usr/bin/env python
22
3- import sys , os , json , random , urlparse
3+ import sys , os , json , random
44sys .path .append (os .path .abspath ("." ))
55
6+ try :
7+ from urllib .parse import urlparse
8+ except ImportError :
9+ from urlparse import urlparse
10+
611from optparse import OptionParser
712
813import logging
1722import gevent .event
1823
1924# AMQP
20- from haigha .connection import Connection as haigha_Connection
21- from haigha .message import Message as haigha_Message
25+ haigha = None
26+ try :
27+ import haigha
28+ from haigha .connection import Connection as haigha_Connection
29+ from haigha .message import Message as haigha_Message
30+ except (ImportError , SyntaxError ) as e :
31+ haigha = e
2232
2333# MQTT
2434import paho .mqtt .client as mqtt
@@ -71,7 +81,7 @@ def nack(self, msg):
7181class Engine (object ):
7282 def __init__ (self , broker , discovery_period = None ):
7383 self .broker_url = broker
74- self .broker_info = urlparse . urlparse (self .broker_url )
84+ self .broker_info = urlparse (self .broker_url )
7585 self .discovery_period = discovery_period if discovery_period else DEFAULT_DISCOVERY_PERIOD
7686
7787 def done_callback (self , done_cb ):
@@ -106,6 +116,9 @@ class AmqpEngine(Engine):
106116 def __init__ (self , broker , discovery_period = None ):
107117 Engine .__init__ (self , broker , discovery_period = discovery_period )
108118
119+ if isinstance (haigha , Exception ):
120+ raise haigha
121+
109122 # Prepare connection to AMQP broker
110123 vhost = '/'
111124 if self .broker_info .path :
@@ -234,9 +247,11 @@ def __init__(self, broker):
234247 if self .broker_info .username :
235248 self ._client .username_pw_set (self .broker_info .username , self .broker_info .password )
236249
237- self ._client .on_connect = self ._on_connect
238- self ._client .on_message = self ._on_message
239- self ._client .on_subscribe = self ._on_subscribe
250+ #self._client.on_connect = _on_connect
251+ self ._client .on_connect = lambda c , u , f , rc : self ._on_connect (c , u , f , rc )
252+ self ._client .on_message = lambda c , u , m : self ._on_message (c , u , m )
253+ self ._client .on_subscribe = lambda c , u , m , q : self ._on_subscribe (c , u , m , q )
254+
240255 host = self .broker_info .hostname
241256 port = self .broker_info .port
242257 if port is None :
@@ -252,6 +267,7 @@ def run(self):
252267 self ._message_pump_greenlet = gevent .spawn (self ._message_pump_greenthread )
253268
254269 def _send (self , outport , data ):
270+ logger .debug ('Participant sent on %s' % outport )
255271 ports = self .participant .definition ['outports' ]
256272 serialized = json .dumps (data )
257273 port = [p for p in ports if outport == p ['id' ]][0 ]
@@ -278,13 +294,13 @@ def _message_pump_greenthread(self):
278294 return
279295
280296 def _on_connect (self , client , userdata , flags , rc ):
281- logging .debug ("Connected with result code" + str (rc ))
297+ logger .debug ("Connected with result code" + str (rc ))
282298
283299 # Subscribe to queues for inports
284300 subscriptions = [] # ("topic", QoS)
285301 for port in self .participant .definition ['inports' ]:
286302 topic = port ['queue' ]
287- logging .debug ('subscribing to %s' % topic )
303+ logger .debug ('subscribing to %s' % topic )
288304 subscriptions .append ((topic , 0 ))
289305 self ._client .subscribe (subscriptions )
290306
@@ -300,25 +316,29 @@ def send_discovery():
300316 gevent .Greenlet .spawn (send_discovery )
301317
302318 def _on_subscribe (self , client , userdata , mid , granted_qos ):
303- logging .debug ('subscribed %s' % str (mid ))
319+ logger .debug ('subscribed %s' % str (mid ))
304320
305321 def _on_message (self , client , userdata , mqtt_msg ):
306- logging .debug ('got message on %s' % mqtt_msg .topic )
322+ logger .debug ('got message on %s' % mqtt_msg .topic )
307323 port = ""
308324 for inport in self .participant .definition ['inports' ]:
309325 if inport ['queue' ] == mqtt_msg .topic :
310326 port = inport ['id' ]
311327
312328 def notify ():
313- msg = Message (mqtt_msg .payload )
314- try :
315- msg .json = json .loads (str (mqtt_msg .payload ))
316- msg .data = msg .json # compat
317- except ValueError as e :
318- # Not JSON, assume binary
319- msg .data = msg .buffer
320-
321- self .participant .process (port , msg )
329+ msg = Message (mqtt_msg .payload )
330+ try :
331+ msg .json = json .loads (mqtt_msg .payload .decode ('utf8' ))
332+ msg .data = msg .json # compat
333+ except ValueError as e :
334+ # Not JSON, assume binary
335+ msg .json = e
336+ msg .data = msg .buffer
337+ except Exception as e :
338+ logger .debug ('unknown error %s' % str (e ))
339+
340+ logger .debug ('Delivering message to %s' % port )
341+ self .participant .process (port , msg )
322342
323343 gevent .spawn (notify )
324344
@@ -338,7 +358,7 @@ def run(participant, broker=None, done_cb=None, iips={}):
338358 broker = os .environ .get ('MSGFLO_BROKER' , 'amqp://localhost' )
339359
340360 engine = None
341- broker_info = urlparse . urlparse (broker )
361+ broker_info = urlparse (broker )
342362 if broker_info .scheme == 'amqp' :
343363 engine = AmqpEngine (broker )
344364 elif broker_info .scheme == 'mqtt' :
@@ -379,6 +399,6 @@ def main(Participant, role=None):
379399 waiter = gevent .event .AsyncResult ()
380400 engine = run (participant , done_cb = waiter .set , iips = config .iips )
381401 anon_url = "%s://%s" % (engine .broker_info .scheme , engine .broker_info .hostname )
382- print "%s(%s) running on %s" % (d ['role' ], d ['component' ], anon_url )
402+ print ( "%s(%s) running on %s" % (d ['role' ], d ['component' ], anon_url ) )
383403 sys .stdout .flush ()
384404 waiter .wait ()
0 commit comments