Skip to content

Commit b36580a

Browse files
author
Max Burnette
committed
begin adding support for datasets in simple extractor
1 parent 95d80d1 commit b36580a

File tree

3 files changed

+103
-19
lines changed

3 files changed

+103
-19
lines changed

pyclowder/extractors.py

Lines changed: 98 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -308,50 +308,131 @@ def __init__(self):
308308
self.logger = logging.getLogger('__main__')
309309
self.logger.setLevel(logging.INFO)
310310

311+
# TODO: Support check_message() in simple extractors
312+
311313
def process_message(self, connector, host, secret_key, resource, parameters):
312314
"""
313-
Process a clowder message. This will download the file to local disk and call the
314-
process_file to do the actual processing of the file. The resulting dict is then
315+
Process a clowder message. This will download the file(s) to local disk and call
316+
process_file or process_dataset to do the actual processing. The resulting dict is then
315317
parsed and based on the keys in the dict it will upload the results to the right
316318
location in clowder.
317319
"""
318-
input_file = resource["local_paths"][0]
319-
file_id = resource['id']
320+
if 'files' in resource:
321+
type = 'dataset'
322+
input_files = resource['local_paths']
323+
dataset_id = resource['id']
324+
325+
elif 'local_paths' in resource:
326+
type = 'file'
327+
input_file = resource['local_paths'][0]
328+
file_id = resource['id']
329+
dataset_id = resource['parent']['id']
330+
else:
331+
# TODO: Eventually support other messages such as metadata.added
332+
type = 'unknown'
320333

321-
# call the actual function that processes the file
322-
if file_id and input_file:
334+
# call the actual function that processes the message
335+
if type == 'file' and file_id and input_file:
323336
result = self.process_file(input_file)
337+
elif type == 'dataset' and dataset_id and input_files:
338+
result = self.process_dataset(input_files)
324339
else:
325340
result = dict()
326341

327-
# return information to clowder
328342
try:
343+
# upload metadata to the processed file or dataset
329344
if 'metadata' in result.keys():
330-
metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host)
331345
self.logger.info("upload metadata")
332-
self.logger.debug(metadata)
333-
pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata)
346+
if type == 'file':
347+
metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host)
348+
self.logger.debug(metadata)
349+
pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata)
350+
elif type == 'dataset':
351+
metadata = self.get_metadata(result.get('metadata'), 'dataset', dataset_id, host)
352+
self.logger.debug(metadata)
353+
pyclowder.datasets.upload_metadata(connector, host, secret_key, dataset_id, metadata)
354+
else:
355+
self.logger.error("unable to attach metadata to resource type: %s" % type)
356+
357+
# upload previews to the processed file
334358
if 'previews' in result.keys():
335359
self.logger.info("upload previews")
336-
for preview in result['previews']:
337-
if os.path.exists(str(preview)):
338-
preview = {'file': preview}
339-
self.logger.info("upload preview")
340-
pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview))
360+
if type == 'file':
361+
for preview in result['previews']:
362+
if os.path.exists(str(preview)):
363+
preview = {'file': preview}
364+
self.logger.info("upload preview")
365+
pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview))
366+
else:
367+
# TODO: Add Clowder endpoint & pyclowder method to attach previews to datasets
368+
self.logger.error("previews not currently supported for resource type: %s" % type)
369+
370+
# upload output files to the processed file's parent dataset or processed dataset
371+
if 'outputs' in result.keys():
372+
self.logger.info("upload output files")
373+
if type == 'file' or type == 'dataset':
374+
for output in result['outputs']:
375+
if os.path.exists(str(output)):
376+
pyclowder.files.upload_to_dataset(connector, host, secret_key, dataset_id, str(output))
377+
else:
378+
self.logger.error("unable to upload outputs to resource type: %s" % type)
379+
380+
if 'new_dataset' in result.keys():
381+
if type == 'dataset':
382+
nds = result['new_dataset']
383+
if 'name' not in nds.keys():
384+
self.logger.error("new datasets require a name")
385+
else:
386+
description = nds['description'] if 'description' in nds.keys() else ""
387+
new_dataset_id = pyclowder.datasets.create_empty(connector, host, secret_key, nds['name'],
388+
description)
389+
self.logger.info("created new dataset: %s" % new_dataset_id)
390+
391+
if 'metadata' in nds.keys():
392+
self.logger.info("upload metadata to new dataset")
393+
metadata = self.get_metadata(nds.get('metadata'), 'dataset', new_dataset_id, host)
394+
self.logger.debug(metadata)
395+
pyclowder.datasets.upload_metadata(connector, host, secret_key, new_dataset_id, metadata)
396+
397+
if 'outputs' in nds.keys():
398+
self.logger.info("upload output files to new dataset")
399+
for output in nds['outputs']:
400+
if os.path.exists(str(output)):
401+
pyclowder.files.upload_to_dataset(connector, host, secret_key, new_dataset_id,
402+
str(output))
403+
341404
finally:
342405
self.cleanup_data(result)
343406

344407
def process_file(self, input_file):
345408
"""
346409
This function will process the file and return a dict that contains the result. This
347410
dict can have the following keys:
348-
- metadata: the metadata to be associated with the file
349-
- previews: files on disk with the preview to be uploaded
411+
- metadata: the metadata to be associated with the processed file
412+
- previews: files on disk with the preview to be uploaded to the processed file
413+
- outputs: files on disk to be added to processed file's parent
350414
:param input_file: the file to be processed.
351415
:return: the specially formatted dict.
352416
"""
353417
return dict()
354418

419+
def process_dataset(self, input_files):
420+
"""
421+
This function will process the file list and return a dict that contains the result. This
422+
dict can have the following keys:
423+
- metadata: the metadata to be associated with the processed dataset
424+
- outputs: files on disk to be added to the dataset
425+
- new_dataset: a dict describing a new dataset to be created for the outputs, with the following keys:
426+
- name: the name of the new dataset to be created (including adding the outputs,
427+
metadata and previews contained in new_dataset)
428+
- description: description for the new dataset to be created
429+
- metadata: (see above)
430+
- outputs: (see above)
431+
:param input_files: the files to be processed.
432+
:return: the specially formatted dict.
433+
"""
434+
return dict()
435+
355436
def cleanup_data(self, result):
356437
"""
357438
Once the information is uploaded to clowder this function is called for cleanup. This

sample-extractors/simple-extractor/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
ARG PYCLOWDER_PYTHON=""
2-
FROM clowder/pyclowder${PYCLOWDER_PYTHON}:latest
31

42
ENV EXTRACTION_FUNC=""
53
ENV EXTRACTION_MODULE=""
64

75
COPY simple_extractor.py .
86

7+
RUN apt-get update && apt-get install -y build-essential python3-dev
8+
99
# install any packages
1010
ONBUILD COPY packages.* Dockerfile /home/clowder/
1111
ONBUILD RUN if [ -e packages.apt ]; then \

sample-extractors/simple-extractor/simple_extractor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ def __init__(self, extraction):
1010

1111
def process_file(self, input_file):
1212
return self.extraction(input_file)
13+
14+
def process_dataset(self, input_files):
15+
return self.extraction(input_files)

0 commit comments

Comments
 (0)