@@ -83,8 +83,10 @@ def run(self):
8383 raise NotImplementedError
8484
8585class Message (object ):
86- def __init__ (self , data ):
87- self .data = data
86+ def __init__ (self , raw ):
87+ self .buffer = raw
88+ self .data = raw
89+ self .json = None
8890
8991class AmqpEngine (Engine ):
9092
@@ -174,7 +176,14 @@ def _setup_queue(self, part, channel, direction, port):
174176 def handle_input (msg ):
175177 logger .debug ("Received message: %s" % (msg ,))
176178
177- msg .data = json .loads (msg .body .decode ("utf-8" ))
179+ msg .buffer = msg .body
180+ try :
181+ msg .json = json .loads (msg .body .decode ("utf-8" ))
182+ msg .data = msg .json # compat
183+ except ValueError as e :
184+ # Not JSON, assume binary
185+ msg .data = msg .buffer
186+
178187 part .process (port , msg )
179188 return
180189
@@ -252,9 +261,16 @@ def _on_subscribe(self, client, userdata, mid, granted_qos):
252261 def _on_message (self , client , userdata , mqtt_msg ):
253262 logging .debug ('got message on %s' % mqtt_msg .topic )
254263 port = "" # FIXME: map from topic back to port
264+
255265 def notify ():
256- data = json .loads (str (mqtt_msg .payload ))
257- msg = Message (data )
266+ msg = Message (mqtt_msg .payload )
267+ try :
268+ msg .json = json .loads (str (mqtt_msg .payload ))
269+ msg .data = msg .json # compat
270+ except ValueError as e :
271+ # Not JSON, assume binary
272+ msg .data = msg .buffer
273+
258274 self .participant .process (port , msg )
259275
260276 gevent .spawn (notify )
0 commit comments