@@ -733,19 +733,41 @@ def stop(self):
733733 def alive (self ):
734734 return self .connection is not None
735735
736+ @staticmethod
737+ def _decode_body (body , codecs = ['utf8' , 'iso-8859-1' ]):
738+ # see https://stackoverflow.com/a/15918519
739+ for i in codecs :
740+ try :
741+ return body .decode (i )
742+ except UnicodeDecodeError :
743+ pass
744+ raise ValueError ("Cannot decode body" )
745+
736746 def on_message (self , channel , method , header , body ):
737747 """When the message is received this will call the generic _process_message in
738748 the connector class. Any message will only be acked if the message is processed,
739749 or there is an exception (except for SystemExit and SystemError exceptions).
740750 """
741751
742- json_body = json .loads (body )
743- if 'routing_key' not in json_body and method .routing_key :
744- json_body ['routing_key' ] = method .routing_key
745-
746- self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , self .check_message ,
747- self .process_message , self .ssl_verify , self .mounted_paths , method , header , body )
748- self .worker .start_thread (json_body )
752+ try :
753+ json_body = json .loads (self ._decode_body (body ))
754+ if 'routing_key' not in json_body and method .routing_key :
755+ json_body ['routing_key' ] = method .routing_key
756+
757+ self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , self .check_message ,
758+ self .process_message , self .ssl_verify , self .mounted_paths ,
759+ method , header , body )
760+ self .worker .start_thread (json_body )
761+
762+ except ValueError :
763+ # something went wrong, move message to error queue and give up on this message immediately
764+ logging .exception ("Error processing message, message moved to error queue" )
765+ properties = pika .BasicProperties (delivery_mode = 2 , reply_to = header .reply_to )
766+ channel .basic_publish (exchange = '' ,
767+ routing_key = 'error.' + self .extractor_name ,
768+ properties = properties ,
769+ body = body )
770+ channel .basic_ack (method .delivery_tag )
749771
750772
751773class RabbitMQBroadcast :
0 commit comments