@@ -761,7 +761,12 @@ def on_message(self, channel, method, header, body):
761761 if 'routing_key' not in json_body and method .routing_key :
762762 json_body ['routing_key' ] = method .routing_key
763763
764- self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , self .check_message ,
764+ if 'jobid' not in json_body :
765+ job_id = None
766+ else :
767+ job_id = json_body ['jobid' ]
768+
769+ self .worker = RabbitMQHandler (self .extractor_name , self .extractor_info , job_id , self .check_message ,
765770 self .process_message , self .ssl_verify , self .mounted_paths , self .clowder_url ,
766771 method , header , body )
767772 self .worker .start_thread (json_body )
@@ -836,13 +841,15 @@ class RabbitMQHandler(Connector):
836841 a queue of messages that the super- loop can access and send later.
837842 """
838843
839- def __init__ (self , extractor_name , extractor_info , check_message = None , process_message = None , ssl_verify = True ,
844+ def __init__ (self , extractor_name , extractor_info , job_id , check_message = None , process_message = None , ssl_verify = True ,
840845 mounted_paths = None , clowder_url = None , method = None , header = None , body = None ):
846+
841847 super (RabbitMQHandler , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
842848 ssl_verify , mounted_paths , clowder_url )
843849 self .method = method
844850 self .header = header
845851 self .body = body
852+ self .job_id = job_id
846853 self .messages = []
847854 self .thread = None
848855 self .finished = False
@@ -921,6 +928,7 @@ def status_update(self, status, resource, message):
921928 status_report = dict ()
922929 # TODO: Update this to check resource["type"] once Clowder better supports dataset events
923930 status_report ['file_id' ] = resource ["id" ]
931+ status_report ['job_id' ] = self .job_id
924932 status_report ['extractor_id' ] = self .extractor_info ['name' ]
925933 status_report ['status' ] = "%s: %s" % (status , message )
926934 status_report ['start' ] = pyclowder .utils .iso8601time ()
@@ -953,7 +961,8 @@ class HPCConnector(Connector):
953961 def __init__ (self , extractor_name , extractor_info , picklefile ,
954962 check_message = None , process_message = None , ssl_verify = True , mounted_paths = None ):
955963 super (HPCConnector , self ).__init__ (extractor_name , extractor_info , check_message , process_message ,
956- ssl_verify , mounted_paths )
964+ ssl_verify , job_id , mounted_paths )
965+ self .job_id = job_id
957966 self .picklefile = picklefile
958967 self .logfile = None
959968
@@ -992,6 +1001,7 @@ def status_update(self, status, resource, message):
9921001 statusreport = dict ()
9931002 statusreport ['file_id' ] = resource ["id" ]
9941003 statusreport ['extractor_id' ] = self .extractor_info ['name' ]
1004+ statusreport ['job_id' ] = self .job_id
9951005 statusreport ['status' ] = "%s: %s" % (status , message )
9961006 statusreport ['start' ] = time .strftime ('%Y-%m-%dT%H:%M:%S' )
9971007 log .write (json .dumps (statusreport ) + '\n ' )
0 commit comments