Skip to content

Commit 80e3e13

Browse files
committed
Proof-of-concept for tracking extractions using new job_id
1 parent 3ae702d commit 80e3e13

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

pyclowder/connectors.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,9 @@ def on_message(self, channel, method, header, body):
760760
if 'routing_key' not in json_body and method.routing_key:
761761
json_body['routing_key'] = method.routing_key
762762

763-
self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, self.check_message,
763+
job_id = json_body['jobid']
764+
765+
self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message,
764766
self.process_message, self.ssl_verify, self.mounted_paths,
765767
method, header, body)
766768
self.worker.start_thread(json_body)
@@ -835,13 +837,14 @@ class RabbitMQHandler(Connector):
835837
a queue of messages that the super- loop can access and send later.
836838
"""
837839

838-
def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
840+
def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
839841
mounted_paths=None, method=None, header=None, body=None):
840842
super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
841843
ssl_verify, mounted_paths)
842844
self.method = method
843845
self.header = header
844846
self.body = body
847+
self.job_id = job_id
845848
self.messages = []
846849
self.thread = None
847850
self.finished = False
@@ -920,6 +923,7 @@ def status_update(self, status, resource, message):
920923
status_report = dict()
921924
# TODO: Update this to check resource["type"] once Clowder better supports dataset events
922925
status_report['file_id'] = resource["id"]
926+
status_report['job_id'] = self.job_id
923927
status_report['extractor_id'] = self.extractor_info['name']
924928
status_report['status'] = "%s: %s" % (status, message)
925929
status_report['start'] = pyclowder.utils.iso8601time()
@@ -952,7 +956,8 @@ class HPCConnector(Connector):
952956
def __init__(self, extractor_name, extractor_info, picklefile,
953957
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None):
954958
super(HPCConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
955-
ssl_verify, mounted_paths)
959+
ssl_verify, job_id, mounted_paths)
960+
self.job_id = job_id
956961
self.picklefile = picklefile
957962
self.logfile = None
958963

@@ -991,6 +996,7 @@ def status_update(self, status, resource, message):
991996
statusreport = dict()
992997
statusreport['file_id'] = resource["id"]
993998
statusreport['extractor_id'] = self.extractor_info['name']
999+
statusreport['job_id'] = self.job_id
9941000
statusreport['status'] = "%s: %s" % (status, message)
9951001
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
9961002
log.write(json.dumps(statusreport) + '\n')

0 commit comments

Comments
 (0)