@@ -658,18 +658,24 @@ def stop(self):
658658 def alive (self ):
659659 return self .connection is not None
660660
661+ @staticmethod
662+ def _decode_body (body , codecs = ['utf8' , 'iso-8859-1' ]):
663+ # see https://stackoverflow.com/a/15918519
664+ for i in codecs :
665+ try :
666+ return body .decode (i )
667+ except UnicodeDecodeError :
668+ pass
669+ raise ValueError ("Cannot decode body" )
670+
661671 def on_message (self , channel , method , header , body ):
662672 """When the message is received this will call the generic _process_message in
663673 the connector class. Any message will only be acked if the message is processed,
664674 or there is an exception (except for SystemExit and SystemError exceptions).
665675 """
666676
667677 try :
668- try :
669- json_body = json .loads (body )
670- except ValueError :
671- json_body = json .loads (body , encoding = 'ISO-8859-1' )
672-
678+ json_body = json .loads (self ._decode_body (body ))
673679 if 'routing_key' not in json_body and method .routing_key :
674680 json_body ['routing_key' ] = method .routing_key
675681
@@ -678,7 +684,7 @@ def on_message(self, channel, method, header, body):
678684 method , header , body )
679685 self .worker .start_thread (json_body )
680686
681- except : # pylint: disable=broad-except
687+ except ValueError :
682688 # something went wrong, move message to error queue and give up on this message immediately
683689 logging .exception ("Error processing message, message moved to error queue" )
684690 properties = pika .BasicProperties (delivery_mode = 2 , reply_to = header .reply_to )
0 commit comments