|
9 | 9 | import shutil |
10 | 10 | import platform |
11 | 11 | import traceback |
| 12 | +import yaml |
12 | 13 | from typing import Dict, List, Optional |
13 | 14 | from datetime import datetime |
| 15 | +import sophios.post_compile as pc |
| 16 | +from sophios.wic_types import Json |
14 | 17 |
|
15 | 18 | try: |
16 | 19 | import cwltool.main |
@@ -349,6 +352,141 @@ def copy_output_files(yaml_stem: str) -> None: |
349 | 352 | sub.run(cmd, check=True) |
350 | 353 |
|
351 | 354 |
|
| 355 | +def build_cmd(workflow_name: str, basepath: str, cwl_runner: str, container_cmd: str) -> List[str]: |
| 356 | + """Build the command to run the workflow in an environment |
| 357 | +
|
| 358 | + Args: |
| 359 | + workflow_name (str): Name of the .cwl workflow file to be executed |
| 360 | + basepath (str): The path at which the workflow to be executed |
| 361 | + cwl_runner (str): The CWL runner used to execute the workflow |
| 362 | + container_cmd (str): The container engine command |
| 363 | + Returns: |
| 364 | + cmd (List[str]): The command to run the workflow |
| 365 | + """ |
| 366 | + quiet = ['--quiet'] |
| 367 | + skip_schemas = ['--skip-schemas'] |
| 368 | + provenance = ['--provenance', f'provenance/{workflow_name}'] |
| 369 | + container_cmd_: List[str] = [] |
| 370 | + if container_cmd == 'docker': |
| 371 | + container_cmd_ = [] |
| 372 | + elif container_cmd == 'singularity': |
| 373 | + container_cmd_ = ['--singularity'] |
| 374 | + else: |
| 375 | + container_cmd_ = ['--user-space-docker-cmd', container_cmd] |
| 376 | + write_summary = ['--write-summary', f'output_{workflow_name}.json'] |
| 377 | + path_check = ['--relax-path-checks'] |
| 378 | + # See https://github.com/common-workflow-language/cwltool/blob/5a645dfd4b00e0a704b928cc0bae135b0591cc1a/cwltool/command_line_tool.py#L94 |
| 379 | + # NOTE: Using --leave-outputs to disable --outdir |
| 380 | + # See https://github.com/dnanexus/dx-cwl/issues/20 |
| 381 | + # --outdir has one or more bugs which will cause workflows to fail!!! |
| 382 | + container_pull = ['--disable-pull'] # Use cwl-docker-extract to pull images |
| 383 | + script = 'cwltool_filterlog' if cwl_runner == 'cwltool' else cwl_runner |
| 384 | + cmd = [script] + container_pull + quiet + provenance + \ |
| 385 | + container_cmd_ + write_summary + skip_schemas + path_check |
| 386 | + if cwl_runner == 'cwltool': |
| 387 | + cmd += ['--leave-outputs', |
| 388 | + f'{basepath}/{workflow_name}.cwl', f'{basepath}/{workflow_name}_inputs.yml'] |
| 389 | + elif cwl_runner == 'toil-cwl-runner': |
| 390 | + container_pull = [] |
| 391 | + now = datetime.now() |
| 392 | + date_time = now.strftime("%Y%m%d%H%M%S") |
| 393 | + cmd = [script] + container_pull + provenance + container_cmd_ + path_check |
| 394 | + cmd += ['--outdir', f'outdir_toil_{workflow_name}_{date_time}', |
| 395 | + '--jobStore', f'file:./jobStore_{workflow_name}', # NOTE: This is the equivalent of --cachedir |
| 396 | + '--clean', 'always', # This effectively disables caching, but is reproducible |
| 397 | + '--disableProgress', # disable the progress bar in the terminal, saves UI cycle |
| 398 | + '--workDir', '/data1', |
| 399 | + '--coordinationDir', '/data1', |
| 400 | + '--logLevel', 'INFO', |
| 401 | + f'{basepath}/{workflow_name}.cwl', f'{basepath}/{workflow_name}_inputs.yml'] |
| 402 | + else: |
| 403 | + pass |
| 404 | + return cmd |
| 405 | + |
| 406 | + |
| 407 | +def run_cwl_workflow(workflow_name: str, basepath: str, cwl_runner: str, container_cmd: str, use_subprocess: bool, env_commands: List[str] = []) -> int: |
| 408 | + """Run the CWL workflow in an environment |
| 409 | +
|
| 410 | + Args: |
| 411 | + workflow_name (str): Name of the .cwl workflow file to be executed |
| 412 | + basepath (str): The path at which the workflow to be executed |
| 413 | + cwl_runner (str): The CWL runner used to execute the workflow |
| 414 | + container_cmd (str): The container engine command |
| 415 | + use_subprocess (bool): When using cwltool, determines whether to use subprocess.run(...) |
| 416 | + or use the cwltool python api. |
| 417 | + env_commands (List[str]): environment variables and commands needed to be run before running the workflow |
| 418 | + Returns: |
| 419 | + retval: The return value |
| 420 | + """ |
| 421 | + cmd = build_cmd(workflow_name, basepath, cwl_runner, container_cmd) |
| 422 | + cmdline = ' '.join(cmd) |
| 423 | + |
| 424 | + retval = 1 # overwrite on success |
| 425 | + print('Running ' + cmdline) |
| 426 | + if use_subprocess: |
| 427 | + # To run in parallel (i.e. pytest ... --workers 8 ...), we need to |
| 428 | + # use separate processes. Otherwise: |
| 429 | + # "signal only works in main thread or with __pypy__.thread.enable_signals()" |
| 430 | + proc = sub.run(cmd, check=False) |
| 431 | + retval = proc.returncode |
| 432 | + else: |
| 433 | + print('via cwltool.main.main python API') |
| 434 | + try: |
| 435 | + if cwl_runner == 'cwltool': |
| 436 | + retval = cwltool.main.main(cmd[1:]) |
| 437 | + elif cwl_runner == 'toil-cwl-runner': |
| 438 | + _ = sub.run(env_commands, shell=True, check=False, executable="/bin/bash") |
| 439 | + retval = toil.cwl.cwltoil.main(cmd[1:]) |
| 440 | + else: |
| 441 | + raise Exception("Invalid cwl_runner!") |
| 442 | + |
| 443 | + print(f'Final output json metadata blob is in output_{workflow_name}.json') |
| 444 | + except Exception as e: |
| 445 | + print('Failed to execute', workflow_name) |
| 446 | + print(f'See error_{workflow_name}.txt for detailed technical information.') |
| 447 | + # Do not display a nasty stack trace to the user; hide it in a file. |
| 448 | + with open(f'error_{workflow_name}.txt', mode='w', encoding='utf-8') as f: |
| 449 | + # https://mypy.readthedocs.io/en/stable/common_issues.html#python-version-and-system-platform-checks |
| 450 | + if sys.version_info >= (3, 10): |
| 451 | + traceback.print_exception(type(e), value=e, tb=None, file=f) |
| 452 | + print(e) # we are always running this on CI |
| 453 | + # only copy output files if using cwltool |
| 454 | + if cwl_runner == 'cwltool': |
| 455 | + copy_output_files(workflow_name) |
| 456 | + return retval |
| 457 | + |
| 458 | + |
| 459 | +async def run_cwl_serialized_async(workflow: Json, basepath: str, |
| 460 | + cwl_runner: str, container_cmd: str, |
| 461 | + env_commands: List[str] = []) -> None: |
| 462 | + """Prepare and run compiled and serialized CWL workflow asynchronously |
| 463 | +
|
| 464 | + Args: |
| 465 | + workflow_json (Json): Compiled and serialized CWL workflow |
| 466 | + basepath (str): The path at which the workflow to be executed |
| 467 | + cwl_runner (str): The CWL runner used to execute the workflow |
| 468 | + container_cmd (str): The container engine command |
| 469 | + env_commands (List[str]): environment variables and commands needed to be run before running the workflow |
| 470 | + """ |
| 471 | + workflow_name = workflow['name'] |
| 472 | + output_dirs = pc.find_output_dirs(workflow) |
| 473 | + pc.create_output_dirs(output_dirs, basepath) |
| 474 | + compiled_cwl = workflow_name + '.cwl' |
| 475 | + inputs_yml = workflow_name + '_inputs.yml' |
| 476 | + # write _input.yml file |
| 477 | + with open(Path.cwd() / basepath / inputs_yml, 'w', encoding='utf-8') as f: |
| 478 | + yaml.dump(workflow['yaml_inputs'], f) |
| 479 | + workflow.pop('retval', None) |
| 480 | + workflow.pop('yaml_inputs', None) |
| 481 | + workflow.pop('name', None) |
| 482 | + # write compiled .cwl file |
| 483 | + with open(Path.cwd() / basepath / compiled_cwl, 'w', encoding='utf-8') as f: |
| 484 | + yaml.dump(workflow, f) |
| 485 | + retval = run_cwl_workflow(workflow_name, basepath, |
| 486 | + cwl_runner, container_cmd, False, env_commands=env_commands) |
| 487 | + assert retval == 0 |
| 488 | + |
| 489 | + |
352 | 490 | def stage_input_files(yml_inputs: Yaml, root_yml_dir_abs: Path, |
353 | 491 | relative_run_path: bool = True, throw: bool = True) -> None: |
354 | 492 | """Copies the input files in yml_inputs to the working directory. |
|
0 commit comments