Skip to content

Commit 75271e2

Browse files
authored
Add MPIRequirement (#1276)
1 parent 520acbf commit 75271e2

17 files changed

+759
-19
lines changed

README.rst

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,80 @@ For this example, grab the test.json (and input file) from https://github.com/Ca
558558

559559
.. _`GA4GH Tool Registry API`: https://github.com/ga4gh/tool-registry-schemas
560560

561+
Running MPI-based tools that need to be launched
562+
------------------------------------------------
563+
564+
Cwltool supports an extension to the CWL spec
565+
``http://commonwl.org/cwltool#MPIRequirement``. When the tool
566+
definition has this in its ``requirements``/``hints`` section, and
567+
cwltool has been run with ``--enable-ext``, then the tool's command
568+
line will be extended with the commands needed to launch it with
569+
``mpirun`` or similar. You can specify the number of processes to
570+
start as either a literal integer or an expression (that will result
571+
in an integer). For example::
572+
573+
#!/usr/bin/env cwl-runner
574+
cwlVersion: v1.1
575+
class: CommandLineTool
576+
$namespaces:
577+
cwltool: "http://commonwl.org/cwltool#"
578+
requirements:
579+
cwltool:MPIRequirement:
580+
processes: $(inputs.nproc)
581+
inputs:
582+
nproc:
583+
type: int
584+
585+
Interaction with containers: the MPIRequirement currently prepends its
586+
commands to the front of the command line that is constructed. If you
587+
wish to run a containerised application in parallel, for simple use
588+
cases this does work with Singularity, depending upon the platform
589+
setup. However this combination should be considered "alpha" -- please
590+
do report any issues you have! This does not work with Docker at the
591+
moment. (More precisely, you get `n` copies of the same single process
592+
image run at the same time that cannot communicate with each other.)
593+
594+
The host-specific parameters are configured in a simple YAML file
595+
(specified with the ``--mpi-config-file`` flag). The allowed keys are
596+
given in the following table; all are optional.
597+
598+
+----------------+------------------+----------+------------------------------+
599+
| Key | Type | Default | Description |
600+
+================+==================+==========+==============================+
601+
| runner | str | "mpirun" | The primary command to use. |
602+
+----------------+------------------+----------+------------------------------+
603+
| nproc_flag | str | "-n" | Flag to set number of |
604+
| | | | processes to start. |
605+
+----------------+------------------+----------+------------------------------+
606+
| default_nproc | int | 1 | Default number of processes. |
607+
+----------------+------------------+----------+------------------------------+
608+
| extra_flags | List[str] | [] | A list of any other flags to |
609+
| | | | be added to the runner's |
610+
| | | | command line before |
611+
| | | | the ``baseCommand``. |
612+
+----------------+------------------+----------+------------------------------+
613+
| env_pass | List[str] | [] | A list of environment |
614+
| | | | variables that should be |
615+
| | | | passed from the host |
616+
| | | | environment through to the |
617+
| | | | tool (e.g. giving the |
618+
| | | | nodelist as set by your |
619+
| | | | scheduler). |
620+
+----------------+------------------+----------+------------------------------+
621+
| env_pass_regex | List[str] | [] | A list of python regular |
622+
| | | | expressions that will be |
623+
| | | | matched against the host's |
624+
| | | | environment. Those that match|
625+
| | | | will be passed through. |
626+
+----------------+------------------+----------+------------------------------+
627+
| env_set | Mapping[str,str] | {} | A dictionary whose keys are |
628+
| | | | the environment variables set|
629+
| | | | and the values being the |
630+
| | | | values. |
631+
+----------------+------------------+----------+------------------------------+
632+
633+
634+
561635
===========
562636
Development
563637
===========

cwltool/argparser.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,15 @@ def arg_parser() -> argparse.ArgumentParser:
615615
"listed targets (can provide more than once).",
616616
)
617617

618+
parser.add_argument(
619+
"--mpi-config-file",
620+
type=str,
621+
default=None,
622+
help="Platform specific configuration for MPI (parallel "
623+
"launcher, its flag etc). See README section 'Running MPI-"
624+
"based tools' for details of the format.",
625+
)
626+
618627
parser.add_argument(
619628
"workflow",
620629
type=str,

cwltool/command_line_tool.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from .flatten import flatten
4646
from .job import CommandLineJob, JobBase
4747
from .loghandler import _logger
48+
from .mpi import MPIRequirementName
4849
from .mutation import MutationManager
4950
from .pathmapper import PathMapper
5051
from .process import (
@@ -360,16 +361,21 @@ def __init__(
360361
self.prov_obj = loadingContext.prov_obj
361362

362363
def make_job_runner(self, runtimeContext: RuntimeContext) -> Type[JobBase]:
363-
dockerReq, _ = self.get_requirement("DockerRequirement")
364+
dockerReq, dockerRequired = self.get_requirement("DockerRequirement")
365+
mpiReq, mpiRequired = self.get_requirement(MPIRequirementName)
366+
364367
if not dockerReq and runtimeContext.use_container:
365368
if runtimeContext.find_default_container is not None:
366369
default_container = runtimeContext.find_default_container(self)
367370
if default_container is not None:
368-
self.requirements.insert(
369-
0,
370-
{"class": "DockerRequirement", "dockerPull": default_container},
371-
)
372-
dockerReq = self.requirements[0]
371+
dockerReq = {"class": "DockerRequirement", "dockerPull": default_container}
372+
if mpiRequired:
373+
self.hints.insert(0, dockerReq)
374+
dockerRequired = False
375+
else:
376+
self.requirements.insert(0, dockerReq)
377+
dockerRequired = True
378+
373379
if (
374380
default_container == windows_default_container_id
375381
and runtimeContext.use_container
@@ -382,17 +388,40 @@ def make_job_runner(self, runtimeContext: RuntimeContext) -> Type[JobBase]:
382388
)
383389

384390
if dockerReq is not None and runtimeContext.use_container:
391+
if mpiReq is not None:
392+
_logger.warning(
393+
"MPIRequirement with containers is a beta feature"
394+
)
385395
if runtimeContext.singularity:
386396
return SingularityCommandLineJob
387397
elif runtimeContext.user_space_docker_cmd:
388398
return UDockerCommandLineJob
399+
if mpiReq is not None:
400+
if mpiRequired:
401+
if dockerRequired:
402+
raise UnsupportedRequirement("No support for Docker and MPIRequirement both being required")
403+
else:
404+
_logger.warning(
405+
"MPI has been required while Docker is hinted, discarding Docker hint(s)"
406+
)
407+
self.hints = [h for h in self.hints if h["class"] != "DockerRequirement"]
408+
return CommandLineJob
409+
else:
410+
if dockerRequired:
411+
_logger.warning(
412+
"Docker has been required while MPI is hinted, discarding MPI hint(s)"
413+
)
414+
self.hints = [h for h in self.hints if h["class"] != MPIRequirementName]
415+
else:
416+
raise UnsupportedRequirement(
417+
"Both Docker and MPI have been hinted - don't know what to do"
418+
)
389419
return DockerCommandLineJob
390-
for t in reversed(self.requirements):
391-
if t["class"] == "DockerRequirement":
392-
raise UnsupportedRequirement(
393-
"--no-container, but this CommandLineTool has "
394-
"DockerRequirement under 'requirements'."
395-
)
420+
if dockerRequired:
421+
raise UnsupportedRequirement(
422+
"--no-container, but this CommandLineTool has "
423+
"DockerRequirement under 'requirements'."
424+
)
396425
return CommandLineJob
397426

398427
def make_path_mapper(
@@ -839,6 +868,19 @@ def register_reader(f: CWLObjectType) -> None:
839868
)
840869
j.output_callback = output_callbacks
841870

871+
mpi, _ = self.get_requirement(MPIRequirementName)
872+
873+
if mpi is not None:
874+
np = cast( # From the schema for MPIRequirement.processes
875+
Union[int, str],
876+
mpi.get('processes', runtimeContext.mpi_config.default_nproc)
877+
)
878+
if isinstance(np, str):
879+
tmp = builder.do_eval(np)
880+
if not isinstance(tmp, int):
881+
raise TypeError("{} needs 'processes' to evaluate to an int, got {}".format(MPIRequirementName, type(np)))
882+
np = tmp
883+
j.mpi_procs = np
842884
yield j
843885

844886
def collect_output_ports(

cwltool/context.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing_extensions import TYPE_CHECKING
1111

1212
from .builder import Builder, HasReqsHints
13+
from .mpi import MpiConfig
1314
from .mutation import MutationManager
1415
from .pathmapper import PathMapper
1516
from .secrets import SecretStore
@@ -78,7 +79,7 @@ def copy(self):
7879

7980
class RuntimeContext(ContextBase):
8081
def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
81-
"""Initializet the RuntimeContext from the kwargs."""
82+
"""Initialize the RuntimeContext from the kwargs."""
8283
select_resources_callable = Callable[ # pylint: disable=unused-variable
8384
[Dict[str, Union[int, float]], RuntimeContext], Dict[str, Union[int, float]]
8485
]
@@ -141,6 +142,8 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
141142
self.cwl_full_name = "" # type: str
142143
self.process_run_id = None # type: Optional[str]
143144
self.prov_obj = None # type: Optional[ProvenanceProfile]
145+
self.mpi_config = MpiConfig() # type: MpiConfig
146+
144147
super(RuntimeContext, self).__init__(kwargs)
145148

146149
def copy(self):

cwltool/extensions-v1.1.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,23 @@ $graph:
4444
subscope: run
4545
doc: |
4646
Specifies the process to run.
47+
48+
- name: MPIRequirement
49+
type: record
50+
inVocab: false
51+
extends: cwl:ProcessRequirement
52+
doc: |
53+
Indicates that a process requires an MPI runtime.
54+
fields:
55+
- name: class
56+
type: string
57+
doc: "Always 'MPIRequirement'"
58+
jsonldPredicate:
59+
"_id": "@type"
60+
"_type": "@vocab"
61+
- name: processes
62+
type: [int, string]
63+
doc: |
64+
The number of MPI processes to start. If you give a string,
65+
this will be evaluated as a CWL Expression and it must
66+
evaluate to an integer.

cwltool/extensions.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,23 @@ $graph:
154154
_type: "@id"
155155
doc: |
156156
Specifies the process to run.
157+
158+
- name: MPIRequirement
159+
type: record
160+
inVocab: false
161+
extends: cwl:ProcessRequirement
162+
doc: |
163+
Indicates that a process requires an MPI runtime.
164+
fields:
165+
- name: class
166+
type: string
167+
doc: "Always 'MPIRequirement'"
168+
jsonldPredicate:
169+
"_id": "@type"
170+
"_type": "@vocab"
171+
- name: processes
172+
type: [int, string]
173+
doc: |
174+
The number of MPI processes to start. If you give a string,
175+
this will be evaluated as a CWL Expression and it must
176+
evaluate to an integer.

cwltool/job.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ def __init__(
225225
self.parent_wf = None # type: Optional[ProvenanceProfile]
226226
self.timelimit = None # type: Optional[int]
227227
self.networkaccess = False # type: bool
228+
self.mpi_procs = None # type: Optional[int]
228229

229230
def __repr__(self): # type: () -> str
230231
"""Represent this Job object."""
@@ -286,6 +287,16 @@ def _execute(
286287
if scr is not None:
287288
shouldquote = neverquote
288289

290+
# If mpi_procs (is not None and > 0) then prepend the
291+
# appropriate MPI job launch command and flags before the
292+
# execution.
293+
if self.mpi_procs:
294+
menv = runtimeContext.mpi_config
295+
mpi_runtime = [menv.runner, menv.nproc_flag, str(self.mpi_procs)] + menv.extra_flags
296+
runtime = mpi_runtime + runtime
297+
menv.pass_through_env_vars(env)
298+
menv.set_env_vars(env)
299+
289300
_logger.info(
290301
"[job %s] %s$ %s%s%s%s",
291302
self.name,

cwltool/main.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
windows_default_container_id,
100100
)
101101
from .workflow import Workflow
102-
102+
from .mpi import MpiConfig
103103

104104
def _terminate_processes() -> None:
105105
"""Kill all spawned processes.
@@ -632,16 +632,15 @@ def setup_schema(
632632
if custom_schema_callback is not None:
633633
custom_schema_callback()
634634
elif args.enable_ext:
635-
res = pkg_resources.resource_stream(__name__, "extensions.yml")
636-
ext10 = res.read().decode("utf-8")
637-
res = pkg_resources.resource_stream(__name__, "extensions-v1.1.yml")
638-
ext11 = res.read().decode("utf-8")
635+
with pkg_resources.resource_stream(__name__, "extensions.yml") as res:
636+
ext10 = res.read().decode("utf-8")
637+
with pkg_resources.resource_stream(__name__, "extensions-v1.1.yml") as res:
638+
ext11 = res.read().decode("utf-8")
639639
use_custom_schema("v1.0", "http://commonwl.org/cwltool", ext10)
640640
use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11)
641641
use_custom_schema("v1.2.0-dev1", "http://commonwl.org/cwltool", ext11)
642642
use_custom_schema("v1.2.0-dev2", "http://commonwl.org/cwltool", ext11)
643643
use_custom_schema("v1.2.0-dev3", "http://commonwl.org/cwltool", ext11)
644-
res.close()
645644
else:
646645
use_standard_schema("v1.0")
647646
use_standard_schema("v1.1")
@@ -888,6 +887,9 @@ def main(
888887
if not args.enable_ga4gh_tool_registry:
889888
del ga4gh_tool_registries[:]
890889

890+
if args.mpi_config_file is not None:
891+
runtimeContext.mpi_config = MpiConfig.load(args.mpi_config_file)
892+
891893
setup_schema(args, custom_schema_callback)
892894

893895
if args.provenance:

0 commit comments

Comments
 (0)