@@ -620,7 +620,7 @@ def listen(self):
620620 self .channel .connection .process_data_events (time_limit = 1 ) # 1 second
621621 if self .worker :
622622 self .worker .process_messages (self .channel )
623- if self .worker .thread and not self . worker . thread . isAlive ():
623+ if self .worker .is_finished ():
624624 self .worker = None
625625 except SystemExit :
626626 raise
@@ -685,6 +685,8 @@ def __init__(self, extractor_info, check_message=None, process_message=None, ssl
685685 self .body = body
686686 self .messages = []
687687 self .thread = None
688+ self .finished = False
689+ self .lock = threading .Lock ()
688690
689691 def start_thread (self , json_body ):
690692 """Start the separate thread for processing & create a queue for messages.
@@ -701,9 +703,15 @@ def start_thread(self, json_body):
701703 self .thread = threading .Thread (target = self ._process_message , args = (json_body ,))
702704 self .thread .start ()
703705
706+ def is_finished (self ):
707+ with self .lock :
708+ return self .thread and not self .thread .isAlive () and self .finished and len (self .messages ) == 0
709+
704710 def process_messages (self , channel ):
705711 while self .messages :
706- msg = self .messages .pop (0 )
712+ with self .lock :
713+ msg = self .messages .pop (0 )
714+ logging .getLogger (__name__ ).info ("Received %s." % msg ["type" ])
707715
708716 if msg ["type" ] == 'status' :
709717 if self .header .reply_to :
@@ -715,6 +723,8 @@ def process_messages(self, channel):
715723
716724 elif msg ["type" ] == 'ok' :
717725 channel .basic_ack (self .method .delivery_tag )
726+ with self .lock :
727+ self .finished = True
718728
719729 elif msg ["type" ] == 'error' :
720730 properties = pika .BasicProperties (delivery_mode = 2 , reply_to = self .header .reply_to )
@@ -723,6 +733,8 @@ def process_messages(self, channel):
723733 properties = properties ,
724734 body = self .body )
725735 channel .basic_ack (self .method .delivery_tag )
736+ with self .lock :
737+ self .finished = True
726738
727739 elif msg ["type" ] == 'resubmit' :
728740 retry_count = msg ['retry_count' ]
@@ -739,6 +751,11 @@ def process_messages(self, channel):
739751 properties = properties ,
740752 body = json .dumps (jbody ))
741753 channel .basic_ack (self .method .delivery_tag )
754+ with self .lock :
755+ self .finished = True
756+
757+ else :
758+ logging .getLogger (__name__ ).error ("Received unknown message type [%s]." % msg ["type" ])
742759
743760 def status_update (self , status , resource , message ):
744761 super (RabbitMQHandler , self ).status_update (status , resource , message )
@@ -748,22 +765,26 @@ def status_update(self, status, resource, message):
748765 status_report ['extractor_id' ] = self .extractor_info ['name' ]
749766 status_report ['status' ] = "%s: %s" % (status , message )
750767 status_report ['start' ] = pyclowder .utils .iso8601time ()
751- self .messages .append ({"type" : "status" ,
752- "status" : status_report ,
753- "resource" : resource ,
754- "message" : message })
768+ with self .lock :
769+ self .messages .append ({"type" : "status" ,
770+ "status" : status_report ,
771+ "resource" : resource ,
772+ "message" : message })
755773
756774 def message_ok (self , resource ):
757775 super (RabbitMQHandler , self ).message_ok (resource )
758- self .messages .append ({"type" : "ok" })
776+ with self .lock :
777+ self .messages .append ({"type" : "ok" })
759778
760779 def message_error (self , resource ):
761780 super (RabbitMQHandler , self ).message_error (resource )
762- self .messages .append ({"type" : "error" })
781+ with self .lock :
782+ self .messages .append ({"type" : "error" })
763783
764784 def message_resubmit (self , resource , retry_count ):
765785 super (RabbitMQHandler , self ).message_resubmit (resource , retry_count )
766- self .messages .append ({"type" : "resubmit" , "retry_count" : retry_count })
786+ with self .lock :
787+ self .messages .append ({"type" : "resubmit" , "retry_count" : retry_count })
767788
768789
769790class HPCConnector (Connector ):
0 commit comments