Skip to content

Commit 9aa47b3

Browse files
committed
Merge branch 'develop' into separate-extractor-message-and-type
2 parents fa7fb11 + c629530 commit 9aa47b3

File tree

6 files changed

+134
-22
lines changed

6 files changed

+134
-22
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/)
55
and this project adheres to [Semantic Versioning](http://semver.org/).
66

7+
## Unreleased
8+
9+
### Added
10+
- Simple extractors now support datasets, can also create new datasets.
11+
712
## 2.2.3 - 2019-10-14
813

914
### Fixed

docs/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
enum34==1.1.6
22
Sphinx==1.6.2
33
pika==0.10.0
4-
PyYAML==3.11
4+
PyYAML==5.1

pyclowder/connectors.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,12 @@ def on_message(self, channel, method, header, body):
761761
if 'routing_key' not in json_body and method.routing_key:
762762
json_body['routing_key'] = method.routing_key
763763

764-
self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, self.check_message,
764+
if 'jobid' not in json_body:
765+
job_id = None
766+
else:
767+
job_id = json_body['jobid']
768+
769+
self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message,
765770
self.process_message, self.ssl_verify, self.mounted_paths,
766771
method, header, body)
767772
self.worker.start_thread(json_body)
@@ -836,13 +841,14 @@ class RabbitMQHandler(Connector):
836841
a queue of messages that the super- loop can access and send later.
837842
"""
838843

839-
def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
844+
def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
840845
mounted_paths=None, method=None, header=None, body=None):
841846
super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
842847
ssl_verify, mounted_paths)
843848
self.method = method
844849
self.header = header
845850
self.body = body
851+
self.job_id = job_id
846852
self.messages = []
847853
self.thread = None
848854
self.finished = False
@@ -921,6 +927,15 @@ def process_messages(self, channel, rabbitmq_queue):
921927

922928
def status_update(self, status, resource, message):
923929
super(RabbitMQHandler, self).status_update(status, resource, message)
930+
931+
status_report = dict()
932+
# TODO: Update this to check resource["type"] once Clowder better supports dataset events
933+
status_report['file_id'] = resource["id"]
934+
status_report['job_id'] = self.job_id
935+
status_report['extractor_id'] = self.extractor_info['name']
936+
status_report['status'] = "%s: %s" % (status, message)
937+
status_report['start'] = pyclowder.utils.iso8601time()
938+
924939
with self.lock:
925940
# TODO: Remove 'status' from payload later and read from message_type and message in Clowder 2.0
926941
self.messages.append({"type": "status",
@@ -959,7 +974,8 @@ class HPCConnector(Connector):
959974
def __init__(self, extractor_name, extractor_info, picklefile,
960975
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None):
961976
super(HPCConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
962-
ssl_verify, mounted_paths)
977+
ssl_verify, job_id, mounted_paths)
978+
self.job_id = job_id
963979
self.picklefile = picklefile
964980
self.logfile = None
965981

@@ -998,6 +1014,7 @@ def status_update(self, status, resource, message):
9981014
statusreport = dict()
9991015
statusreport['file_id'] = resource["id"]
10001016
statusreport['extractor_id'] = self.extractor_info['name']
1017+
statusreport['job_id'] = self.job_id
10011018
statusreport['status'] = "%s: %s" % (status, message)
10021019
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
10031020
log.write(json.dumps(statusreport) + '\n')

pyclowder/extractors.py

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -308,50 +308,137 @@ def __init__(self):
308308
self.logger = logging.getLogger('__main__')
309309
self.logger.setLevel(logging.INFO)
310310

311+
# TODO: Support check_message() in simple extractors
312+
311313
def process_message(self, connector, host, secret_key, resource, parameters):
312314
"""
313-
Process a clowder message. This will download the file to local disk and call the
314-
process_file to do the actual processing of the file. The resulting dict is then
315+
Process a clowder message. This will download the file(s) to local disk and call
316+
process_file or process_dataset to do the actual processing. The resulting dict is then
315317
parsed and based on the keys in the dict it will upload the results to the right
316318
location in clowder.
317319
"""
318-
input_file = resource["local_paths"][0]
319-
file_id = resource['id']
320+
if 'files' in resource:
321+
type = 'dataset'
322+
input_files = resource['local_paths']
323+
dataset_id = resource['id']
324+
325+
elif 'local_paths' in resource:
326+
type = 'file'
327+
input_file = resource['local_paths'][0]
328+
file_id = resource['id']
329+
dataset_id = resource['parent']['id']
330+
else:
331+
# TODO: Eventually support other messages such as metadata.added
332+
type = 'unknown'
320333

321-
# call the actual function that processes the file
322-
if file_id and input_file:
334+
# call the actual function that processes the message
335+
if type == 'file' and file_id and input_file:
323336
result = self.process_file(input_file)
337+
elif type == 'dataset' and dataset_id and input_files:
338+
result = self.process_dataset(input_files)
324339
else:
325340
result = dict()
326341

327-
# return information to clowder
328342
try:
343+
# upload metadata to the processed file or dataset
329344
if 'metadata' in result.keys():
330-
metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host)
331345
self.logger.info("upload metadata")
332-
self.logger.debug(metadata)
333-
pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata)
346+
if type == 'file':
347+
metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host)
348+
self.logger.debug(metadata)
349+
pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata)
350+
elif type == 'dataset':
351+
metadata = self.get_metadata(result.get('metadata'), 'dataset', dataset_id, host)
352+
self.logger.debug(metadata)
353+
pyclowder.datasets.upload_metadata(connector, host, secret_key, dataset_id, metadata)
354+
else:
355+
self.logger.error("unable to attach metadata to resource type: %s" % type)
356+
357+
# upload previews to the processed file
334358
if 'previews' in result.keys():
335359
self.logger.info("upload previews")
336-
for preview in result['previews']:
337-
if os.path.exists(str(preview)):
338-
preview = {'file': preview}
339-
self.logger.info("upload preview")
340-
pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview))
360+
if type == 'file':
361+
for preview in result['previews']:
362+
if os.path.exists(str(preview)):
363+
preview = {'file': preview}
364+
self.logger.info("upload preview")
365+
pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview))
366+
else:
367+
# TODO: Add Clowder endpoint (& pyclowder method) to attach previews to datasets
368+
self.logger.error("previews not currently supported for resource type: %s" % type)
369+
370+
# upload output files to the processed file's parent dataset or processed dataset
371+
if 'outputs' in result.keys():
372+
self.logger.info("upload output files")
373+
if type == 'file' or type == 'dataset':
374+
for output in result['outputs']:
375+
if os.path.exists(str(output)):
376+
pyclowder.files.upload_to_dataset(connector, host, secret_key, dataset_id, str(output))
377+
else:
378+
self.logger.error("unable to upload outputs to resource type: %s" % type)
379+
380+
if 'new_dataset' in result.keys():
381+
if type == 'dataset':
382+
nds = result['new_dataset']
383+
if 'name' not in nds.keys():
384+
self.logger.error("new datasets require a name")
385+
else:
386+
description = nds['description'] if 'description' in nds.keys() else ""
387+
new_dataset_id = pyclowder.datasets.create_empty(connector, host, secret_key, nds['name'],
388+
description)
389+
self.logger.info("created new dataset: %s" % new_dataset_id)
390+
391+
if 'metadata' in nds.keys():
392+
self.logger.info("upload metadata to new dataset")
393+
metadata = self.get_metadata(nds.get('metadata'), 'dataset', new_dataset_id, host)
394+
self.logger.debug(metadata)
395+
pyclowder.datasets.upload_metadata(connector, host, secret_key, new_dataset_id, metadata)
396+
397+
if 'outputs' in nds.keys():
398+
self.logger.info("upload output files to new dataset")
399+
for output in nds['outputs']:
400+
if os.path.exists(str(output)):
401+
pyclowder.files.upload_to_dataset(connector, host, secret_key, new_dataset_id,
402+
str(output))
403+
404+
if 'previews' in nds.keys():
405+
# TODO: Add Clowder endpoint (& pyclowder method) to attach previews to datasets
406+
self.logger.error("previews not currently supported for resource type: %s" % type)
407+
341408
finally:
342409
self.cleanup_data(result)
343410

344411
def process_file(self, input_file):
345412
"""
346413
This function will process the file and return a dict that contains the result. This
347414
dict can have the following keys:
348-
- metadata: the metadata to be associated with the file
349-
- previews: files on disk with the preview to be uploaded
415+
- metadata: the metadata to be associated with the processed file
416+
- previews: images on disk with the preview to be uploaded to the processed file
417+
- outputs: files on disk to be added to processed file's parent
350418
:param input_file: the file to be processed.
351419
:return: the specially formatted dict.
352420
"""
353421
return dict()
354422

423+
def process_dataset(self, input_files):
424+
"""
425+
This function will process the file list and return a dict that contains the result. This
426+
dict can have the following keys:
427+
- metadata: the metadata to be associated with the processed dataset
428+
- outputs: files on disk to be added to the dataset
429+
- previews: images to be associated with the dataset
430+
- new_dataset: a dict describing a new dataset to be created for the outputs, with the following keys:
431+
- name: the name of the new dataset to be created (including adding the outputs,
432+
metadata and previews contained in new_dataset)
433+
- description: description for the new dataset to be created
434+
- previews: (see above)
435+
- metadata: (see above)
436+
- outputs: (see above)
437+
:param input_files: the files to be processed.
438+
:return: the specially formatted dict.
439+
"""
440+
return dict()
441+
355442
def cleanup_data(self, result):
356443
"""
357444
Once the information is uploaded to clowder this function is called for cleanup. This

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pika==1.0.0
33
PyYAML==5.1
44
requests==2.21.0
55
wheel==0.33.1
6-
urllib3==1.24.1
6+
urllib3==1.24.2
77
pytest==4.3.1
88
pytest-pep8==1.0.6
99
requests-toolbelt==0.9.1

sample-extractors/simple-extractor/simple_extractor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ def __init__(self, extraction):
1010

1111
def process_file(self, input_file):
1212
return self.extraction(input_file)
13+
14+
def process_dataset(self, input_files):
15+
return self.extraction(input_files)

0 commit comments

Comments
 (0)