@@ -664,13 +664,30 @@ def on_message(self, channel, method, header, body):
664664 or there is an exception (except for SystemExit and SystemError exceptions).
665665 """
666666
667- json_body = json .loads (body )
668- if 'routing_key' not in json_body and method .routing_key :
669- json_body ['routing_key' ] = method .routing_key
667+ try :
668+ try :
669+ json_body = json .loads (body )
670+ except ValueError :
671+ json_body = json .loads (body , encoding = 'ISO-8859-1' )
672+
673+ if 'routing_key' not in json_body and method .routing_key :
674+ json_body ['routing_key' ] = method .routing_key
675+
676+ self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , self .check_message ,
677+
678+ self .process_message , self .ssl_verify , self .mounted_paths , method , header , body )
679+
680+ self .worker .start_thread (json_body )
670681
671- self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , self .check_message ,
672- self .process_message , self .ssl_verify , self .mounted_paths , method , header , body )
673- self .worker .start_thread (json_body )
682+ except : # pylint: disable=broad-except
683+ # something went wrong, move message to error queue and give up on this message immediately
684+ logging .exception ("Error processing message, message moved to error queue" )
685+ properties = pika .BasicProperties (delivery_mode = 2 , reply_to = header .reply_to )
686+ channel .basic_publish (exchange = '' ,
687+ routing_key = 'error.' + self .extractor_name ,
688+ properties = properties ,
689+ body = body )
690+ channel .basic_ack (method .delivery_tag )
674691
675692
676693class RabbitMQHandler (Connector ):
0 commit comments