Skip to content

Commit c629530

Browse files
authored
Merge pull request #7 from clowder-framework/extractor-job-tracking-improvements
Pass job_id back during status updates
2 parents 3ae702d + 6e762ad commit c629530

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

pyclowder/connectors.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,12 @@ def on_message(self, channel, method, header, body):
760760
if 'routing_key' not in json_body and method.routing_key:
761761
json_body['routing_key'] = method.routing_key
762762

763-
self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, self.check_message,
763+
if 'jobid' not in json_body:
764+
job_id = None
765+
else:
766+
job_id = json_body['jobid']
767+
768+
self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message,
764769
self.process_message, self.ssl_verify, self.mounted_paths,
765770
method, header, body)
766771
self.worker.start_thread(json_body)
@@ -835,13 +840,14 @@ class RabbitMQHandler(Connector):
835840
a queue of messages that the super- loop can access and send later.
836841
"""
837842

838-
def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
843+
def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
839844
mounted_paths=None, method=None, header=None, body=None):
840845
super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
841846
ssl_verify, mounted_paths)
842847
self.method = method
843848
self.header = header
844849
self.body = body
850+
self.job_id = job_id
845851
self.messages = []
846852
self.thread = None
847853
self.finished = False
@@ -920,6 +926,7 @@ def status_update(self, status, resource, message):
920926
status_report = dict()
921927
# TODO: Update this to check resource["type"] once Clowder better supports dataset events
922928
status_report['file_id'] = resource["id"]
929+
status_report['job_id'] = self.job_id
923930
status_report['extractor_id'] = self.extractor_info['name']
924931
status_report['status'] = "%s: %s" % (status, message)
925932
status_report['start'] = pyclowder.utils.iso8601time()
@@ -952,7 +959,8 @@ class HPCConnector(Connector):
952959
def __init__(self, extractor_name, extractor_info, picklefile,
953960
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None):
954961
super(HPCConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
955-
ssl_verify, mounted_paths)
962+
ssl_verify, job_id, mounted_paths)
963+
self.job_id = job_id
956964
self.picklefile = picklefile
957965
self.logfile = None
958966

@@ -991,6 +999,7 @@ def status_update(self, status, resource, message):
991999
statusreport = dict()
9921000
statusreport['file_id'] = resource["id"]
9931001
statusreport['extractor_id'] = self.extractor_info['name']
1002+
statusreport['job_id'] = self.job_id
9941003
statusreport['status'] = "%s: %s" % (status, message)
9951004
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
9961005
log.write(json.dumps(statusreport) + '\n')

0 commit comments

Comments
 (0)