|
22 | 22 | from pyclowder.connectors import RabbitMQConnector, HPCConnector, LocalConnector |
23 | 23 | from pyclowder.utils import CheckMessage, setup_logging |
24 | 24 | import pyclowder.files |
| 25 | +import pyclowder.datasets |
25 | 26 |
|
26 | 27 |
|
27 | 28 | class Extractor(object): |
@@ -308,55 +309,145 @@ def __init__(self): |
308 | 309 | self.logger = logging.getLogger('__main__') |
309 | 310 | self.logger.setLevel(logging.INFO) |
310 | 311 |
|
| 312 | + # TODO: Support check_message() in simple extractors |
| 313 | + |
311 | 314 | def process_message(self, connector, host, secret_key, resource, parameters): |
312 | 315 | """ |
313 | | - Process a clowder message. This will download the file to local disk and call the |
314 | | - process_file to do the actual processing of the file. The resulting dict is then |
| 316 | + Process a clowder message. This will download the file(s) to local disk and call |
| 317 | + process_file or process_dataset to do the actual processing. The resulting dict is then |
315 | 318 | parsed and based on the keys in the dict it will upload the results to the right |
316 | 319 | location in clowder. |
317 | 320 | """ |
318 | | - input_file = resource["local_paths"][0] |
319 | | - file_id = resource['id'] |
| 321 | + if 'files' in resource: |
| 322 | + type = 'dataset' |
| 323 | + input_files = resource['local_paths'] |
| 324 | + dataset_id = resource['id'] |
| 325 | + |
| 326 | + elif 'local_paths' in resource: |
| 327 | + type = 'file' |
| 328 | + input_file = resource['local_paths'][0] |
| 329 | + file_id = resource['id'] |
| 330 | + dataset_id = resource['parent']['id'] |
| 331 | + else: |
| 332 | + # TODO: Eventually support other messages such as metadata.added |
| 333 | + type = 'unknown' |
320 | 334 |
|
321 | | - # call the actual function that processes the file |
322 | | - if file_id and input_file: |
| 335 | + # call the actual function that processes the message |
| 336 | + if type == 'file' and file_id and input_file: |
323 | 337 | result = self.process_file(input_file) |
| 338 | + elif type == 'dataset' and dataset_id and input_files: |
| 339 | + result = self.process_dataset(input_files) |
324 | 340 | else: |
325 | 341 | result = dict() |
326 | 342 |
|
327 | | - # return information to clowder |
328 | 343 | try: |
| 344 | + # upload metadata to the processed file or dataset |
329 | 345 | if 'metadata' in result.keys(): |
330 | | - metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host) |
331 | 346 | self.logger.info("upload metadata") |
332 | | - self.logger.debug(metadata) |
333 | | - pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata) |
| 347 | + if type == 'file': |
| 348 | + metadata = self.get_metadata(result.get('metadata'), 'file', file_id, host) |
| 349 | + self.logger.debug(metadata) |
| 350 | + pyclowder.files.upload_metadata(connector, host, secret_key, file_id, metadata) |
| 351 | + elif type == 'dataset': |
| 352 | + metadata = self.get_metadata(result.get('metadata'), 'dataset', dataset_id, host) |
| 353 | + self.logger.debug(metadata) |
| 354 | + pyclowder.datasets.upload_metadata(connector, host, secret_key, dataset_id, metadata) |
| 355 | + else: |
| 356 | + self.logger.error("unable to attach metadata to resource type: %s" % type) |
| 357 | + |
| 358 | + # upload previews to the processed file |
334 | 359 | if 'previews' in result.keys(): |
335 | 360 | self.logger.info("upload previews") |
336 | | - for preview in result['previews']: |
337 | | - if os.path.exists(str(preview)): |
338 | | - preview = {'file': preview} |
339 | | - self.logger.info("upload preview") |
340 | | - pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview)) |
| 361 | + if type == 'file': |
| 362 | + for preview in result['previews']: |
| 363 | + if os.path.exists(str(preview)): |
| 364 | + preview = {'file': preview} |
| 365 | + self.logger.info("upload preview") |
| 366 | + pyclowder.files.upload_preview(connector, host, secret_key, file_id, str(preview)) |
| 367 | + else: |
| 368 | + # TODO: Add Clowder endpoint (& pyclowder method) to attach previews to datasets |
| 369 | + self.logger.error("previews not currently supported for resource type: %s" % type) |
| 370 | + |
341 | 371 | if 'tags' in result.keys(): |
342 | | - tags = {"tags": result["tags"]} |
343 | 372 | self.logger.info("upload tags") |
344 | | - self.logger.debug(tags) |
345 | | - pyclowder.files.upload_tags(connector, host, secret_key, file_id, tags) |
| 373 | + tags = {"tags": result["tags"]} |
| 374 | + if type == 'file': |
| 375 | + pyclowder.files.upload_tags(connector, host, secret_key, file_id, tags) |
| 376 | + else: |
| 377 | + pyclowder.datasets.upload_tags(connector, host, secret_key, dataset_id, tags) |
| 378 | + |
| 379 | + # upload output files to the processed file's parent dataset or processed dataset |
| 380 | + if 'outputs' in result.keys(): |
| 381 | + self.logger.info("upload output files") |
| 382 | + if type == 'file' or type == 'dataset': |
| 383 | + for output in result['outputs']: |
| 384 | + if os.path.exists(str(output)): |
| 385 | + pyclowder.files.upload_to_dataset(connector, host, secret_key, dataset_id, str(output)) |
| 386 | + else: |
| 387 | + self.logger.error("unable to upload outputs to resource type: %s" % type) |
| 388 | + |
| 389 | + if 'new_dataset' in result.keys(): |
| 390 | + if type == 'dataset': |
| 391 | + nds = result['new_dataset'] |
| 392 | + if 'name' not in nds.keys(): |
| 393 | + self.logger.error("new datasets require a name") |
| 394 | + else: |
| 395 | + description = nds['description'] if 'description' in nds.keys() else "" |
| 396 | + new_dataset_id = pyclowder.datasets.create_empty(connector, host, secret_key, nds['name'], |
| 397 | + description) |
| 398 | + self.logger.info("created new dataset: %s" % new_dataset_id) |
| 399 | + |
| 400 | + if 'metadata' in nds.keys(): |
| 401 | + self.logger.info("upload metadata to new dataset") |
| 402 | + metadata = self.get_metadata(nds.get('metadata'), 'dataset', new_dataset_id, host) |
| 403 | + self.logger.debug(metadata) |
| 404 | + pyclowder.datasets.upload_metadata(connector, host, secret_key, new_dataset_id, metadata) |
| 405 | + |
| 406 | + if 'outputs' in nds.keys(): |
| 407 | + self.logger.info("upload output files to new dataset") |
| 408 | + for output in nds['outputs']: |
| 409 | + if os.path.exists(str(output)): |
| 410 | + pyclowder.files.upload_to_dataset(connector, host, secret_key, new_dataset_id, |
| 411 | + str(output)) |
| 412 | + |
| 413 | + if 'previews' in nds.keys(): |
| 414 | + # TODO: Add Clowder endpoint (& pyclowder method) to attach previews to datasets |
| 415 | + self.logger.error("previews not currently supported for resource type: %s" % type) |
| 416 | + |
346 | 417 | finally: |
347 | 418 | self.cleanup_data(result) |
348 | 419 |
|
349 | 420 | def process_file(self, input_file): |
350 | 421 | """ |
351 | 422 | This function will process the file and return a dict that contains the result. This |
352 | 423 | dict can have the following keys: |
353 | | - - metadata: the metadata to be associated with the file |
354 | | - - previews: files on disk with the preview to be uploaded |
| 424 | + - metadata: the metadata to be associated with the processed file |
| 425 | + - previews: images on disk with the preview to be uploaded to the processed file |
| 426 | + - outputs: files on disk to be added to processed file's parent |
355 | 427 | :param input_file: the file to be processed. |
356 | 428 | :return: the specially formatted dict. |
357 | 429 | """ |
358 | 430 | return dict() |
359 | 431 |
|
| 432 | + def process_dataset(self, input_files): |
| 433 | + """ |
| 434 | + This function will process the file list and return a dict that contains the result. This |
| 435 | + dict can have the following keys: |
| 436 | + - metadata: the metadata to be associated with the processed dataset |
| 437 | + - outputs: files on disk to be added to the dataset |
| 438 | + - previews: images to be associated with the dataset |
| 439 | + - new_dataset: a dict describing a new dataset to be created for the outputs, with the following keys: |
| 440 | + - name: the name of the new dataset to be created (including adding the outputs, |
| 441 | + metadata and previews contained in new_dataset) |
| 442 | + - description: description for the new dataset to be created |
| 443 | + - previews: (see above) |
| 444 | + - metadata: (see above) |
| 445 | + - outputs: (see above) |
| 446 | + :param input_files: the files to be processed. |
| 447 | + :return: the specially formatted dict. |
| 448 | + """ |
| 449 | + return dict() |
| 450 | + |
360 | 451 | def cleanup_data(self, result): |
361 | 452 | """ |
362 | 453 | Once the information is uploaded to clowder this function is called for cleanup. This |
|
0 commit comments