diff --git a/.github/workflows/testdask.yml b/.github/workflows/testdask.yml index 0e9f3d447c..e14ba1f405 100644 --- a/.github/workflows/testdask.yml +++ b/.github/workflows/testdask.yml @@ -28,7 +28,8 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ github.repository }} - + - name: Fetch tags + run: git fetch --prune --unshallow - name: Setup Python version ${{ matrix.python-version }} uses: actions/setup-python@v5 with: diff --git a/.github/workflows/testpsijlocal.yml b/.github/workflows/testpsijlocal.yml index d99966bf5e..3ad359c505 100644 --- a/.github/workflows/testpsijlocal.yml +++ b/.github/workflows/testpsijlocal.yml @@ -28,7 +28,8 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ github.repository }} - + - name: Fetch tags + run: git fetch --prune --unshallow - name: Setup Python version ${{ matrix.python-version }} uses: actions/setup-python@v5 with: diff --git a/.github/workflows/testpsijslurm.yml b/.github/workflows/testpsijslurm.yml index 57d4e07781..b0fe551ba3 100644 --- a/.github/workflows/testpsijslurm.yml +++ b/.github/workflows/testpsijslurm.yml @@ -25,6 +25,8 @@ jobs: - name: Disable etelemetry run: echo "NO_ET=TRUE" >> $GITHUB_ENV - uses: actions/checkout@v4 + - name: Fetch tags + run: git fetch --prune --unshallow - name: Pull docker image run: | docker pull $DOCKER_IMAGE @@ -48,7 +50,7 @@ jobs: docker exec slurm bash -c "CONFIGURE_OPTS=\"-with-openssl=/opt/openssl\" pyenv install -v 3.11.5" fi docker exec slurm bash -c "pyenv global ${{ matrix.python-version }}" - docker exec slurm bash -c "pip install --upgrade pip && pip install -e /pydra[test,psij] && python -c 'import pydra; print(pydra.__version__)'" + docker exec slurm bash -c "pip install --upgrade pip && pip install -e /pydra[test,psij] && python -c 'import pydra.engine; print(pydra.engine.__version__)'" - name: Run pytest run: | docker exec slurm bash -c "pytest --color=yes -vs -n auto --psij=slurm --cov pydra --cov-config /pydra/.coveragerc --cov-report xml:/pydra/cov.xml --doctest-modules /pydra/pydra/ -k 'not test_audit_prov and not test_audit_prov_messdir_1 and not test_audit_prov_messdir_2 and not test_audit_prov_wf and not test_audit_all'" diff --git a/.github/workflows/testpydra.yml b/.github/workflows/testpydra.yml index f9f7229a10..9865b73137 100644 --- a/.github/workflows/testpydra.yml +++ b/.github/workflows/testpydra.yml @@ -52,74 +52,35 @@ jobs: matrix: os: [macos-latest, ubuntu-latest, windows-latest] python-version: ['3.11', '3.12', '3.13'] - install: ['wheel'] - include: - - os: 'ubuntu-latest' - python-version: '3.11' - install: 'sdist' - - os: 'ubuntu-latest' - python-version: '3.11' - install: 'repo' - - os: 'ubuntu-latest' - python-version: '3.11' - install: 'archive' fail-fast: false runs-on: ${{ matrix.os }} steps: - - name: Fetch sdist/wheel - uses: actions/download-artifact@v4 - if: matrix.install == 'sdist' || matrix.install == 'wheel' - with: - name: dist - path: dist/ - - name: Fetch git archive - uses: actions/download-artifact@v4 - if: matrix.install == 'archive' - with: - name: archive - path: archive/ - name: Fetch repository uses: actions/checkout@v4 - if: matrix.install == 'repo' - + - name: Fetch tags + run: git fetch --prune --unshallow - name: Set up Python ${{ matrix.python-version }} on ${{ matrix.os }} uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Update pip run: python -m pip install --upgrade pip - - name: Determine installation target - run: | - if [[ "$INSTALL" = "sdist" ]]; then - echo "ARCHIVE=$( ls dist/*.tar.gz )" >> $GITHUB_ENV - elif [[ "$INSTALL" = "wheel" ]]; then - echo "ARCHIVE=$( ls dist/*.whl )" >> $GITHUB_ENV - elif [[ "$INSTALL" = "archive" ]]; then - echo "ARCHIVE=$( ls archive/*.zip )" >> $GITHUB_ENV - elif [[ "$INSTALL" = "repo" ]]; then - echo "ARCHIVE=." >> $GITHUB_ENV - fi - env: - INSTALL: ${{ matrix.install }} - - name: Install Pydra - run: pip install $ARCHIVE + run: pip install .[test] - name: Print version - run: python -c "import pydra; print(pydra.__version__)" - - - name: Install Pydra tests dependencies - run: pip install pydra[test] + run: python -c "import pydra.engine; print(pydra.engine.__version__)" - name: Disable etelemetry run: echo "NO_ET=TRUE" >> $GITHUB_ENV - name: Pytest run: | - pytest -vs -n auto --doctest-modules --pyargs pydra \ + pytest -vs -n auto --doctest-modules \ --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml - name: Upload to codecov diff --git a/.github/workflows/testsingularity.yml b/.github/workflows/testsingularity.yml index edcfb7c948..c989334176 100644 --- a/.github/workflows/testsingularity.yml +++ b/.github/workflows/testsingularity.yml @@ -14,7 +14,7 @@ concurrency: jobs: build: name: Build - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: python-version: ['3.11', '3.12', '3.13'] @@ -65,11 +65,13 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ github.repository }} + - name: Fetch tags + run: git fetch --prune --unshallow - name: Install pydra (test) run: pip install -e ".[test]" - name: Pytest - run: pytest -vs --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml pydra/engine/tests/test_singularity.py + run: pytest -vs --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml pydra/engine/tests/test_singularity.py pydra/engine/tests/test_environments.py - name: Upload to codecov run: codecov -f cov.xml -F unittests -e GITHUB_WORKFLOW diff --git a/.github/workflows/testslurm.yml b/.github/workflows/testslurm.yml index 62c1e43792..3e715a127b 100644 --- a/.github/workflows/testslurm.yml +++ b/.github/workflows/testslurm.yml @@ -25,6 +25,8 @@ jobs: - name: Disable etelemetry run: echo "NO_ET=TRUE" >> $GITHUB_ENV - uses: actions/checkout@v4 + - name: Fetch tags + run: git fetch --prune --unshallow - name: Pull docker image run: | docker pull $DOCKER_IMAGE @@ -48,7 +50,7 @@ jobs: docker exec slurm bash -c "CONFIGURE_OPTS=\"-with-openssl=/opt/openssl\" pyenv install -v 3.11.5" fi docker exec slurm bash -c "pyenv global ${{ matrix.python-version }}" - docker exec slurm bash -c "pip install --upgrade pip && pip install -e /pydra[test] && python -c 'import pydra; print(pydra.__version__)'" + docker exec slurm bash -c "pip install --upgrade pip && pip install -e /pydra[test] && python -c 'import pydra.engine; print(pydra.engine.__version__)'" - name: Run pytest run: | docker exec slurm bash -c "pytest --color=yes -vs --cov pydra --cov-config /pydra/.coveragerc --cov-report xml:/pydra/cov.xml --doctest-modules /pydra/pydra/ -k 'not test_audit_prov and not test_audit_prov_messdir_1 and not test_audit_prov_messdir_2 and not test_audit_prov_wf and not test_audit_all'" diff --git a/new-docs/source/tutorial/5-shell.ipynb b/new-docs/source/tutorial/5-shell.ipynb index 806c1c6f9e..5ceb8ced72 100644 --- a/new-docs/source/tutorial/5-shell.ipynb +++ b/new-docs/source/tutorial/5-shell.ipynb @@ -109,11 +109,6 @@ "print(trim_png.cmdline)" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [] - }, { "cell_type": "markdown", "metadata": {}, @@ -180,6 +175,68 @@ "print(f\"'--int-arg' default: {fields_dict(Cp)['int_arg'].default}\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Path templates for output files\n", + "\n", + "By default, when an output file argument is defined, a `path_template` attribute will\n", + "be assigned to the field based on its name and extension (if applicable). For example,\n", + "the `zipped` output field in the following Gzip command will be assigned a\n", + "`path_template` of `out_file.gz`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pydra.design import shell\n", + "from fileformats.generic import File\n", + "\n", + "Gzip = shell.define(\"gzip \")\n", + "gzip = Gzip(in_files=File.mock(\"/a/file.txt\"))\n", + "print(gzip.cmdline)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "However, if this needs to be specified it can be by using the `$` operator, e.g." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "Gzip = shell.define(\"gzip \")\n", + "gzip = Gzip(in_files=File.mock(\"/a/file.txt\"))\n", + "print(gzip.cmdline)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To give the field a path_template of `archive.gz` when it is written on the command line.\n", + "Note that this value can always be overridden when the task is initialised, e.g." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "gzip = Gzip(in_files=File.mock(\"/a/file.txt\"), out_file=\"/path/to/archive.gz\")\n", + "print(gzip.cmdline)" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/pydra/design/boutiques.py b/pydra/design/boutiques.py index 9a48edfd17..334552b5b1 100644 --- a/pydra/design/boutiques.py +++ b/pydra/design/boutiques.py @@ -6,8 +6,8 @@ from functools import reduce from fileformats.generic import File from pydra.engine.specs import ShellDef -from .base import make_task_def -from . import shell +from pydra.design.base import make_task_def +from pydra.design import shell class arg(shell.arg): diff --git a/pydra/design/python.py b/pydra/design/python.py index f41a4e0106..095404f41b 100644 --- a/pydra/design/python.py +++ b/pydra/design/python.py @@ -2,7 +2,7 @@ import inspect from typing import dataclass_transform import attrs -from .base import ( +from pydra.design.base import ( Arg, Out, ensure_field_objects, diff --git a/pydra/design/shell.py b/pydra/design/shell.py index 01ab0be45e..1f0e75543c 100644 --- a/pydra/design/shell.py +++ b/pydra/design/shell.py @@ -13,7 +13,7 @@ from fileformats import generic from fileformats.core.exceptions import FormatRecognitionError from pydra.engine.helpers import attrs_values -from .base import ( +from pydra.design.base import ( Arg, Out, check_explicit_fields_are_none, @@ -151,13 +151,28 @@ class out(Out): passed) or any input field name (a specific input field will be sent). """ - callable: ty.Callable | None = None + callable: ty.Callable | None = attrs.field(default=None) def __attrs_post_init__(self): # Set type from return annotation of callable if not set if self.type is ty.Any and self.callable: self.type = ty.get_type_hints(self.callable).get("return", ty.Any) + @callable.validator + def _callable_validator(self, _, value): + + if value: + if not callable(value): + raise ValueError(f"callable must be a function, not {value!r}") + elif not getattr(self, "path_template", None) and self.name not in [ + "return_code", + "stdout", + "stderr", + ]: # ShellOutputs.BASE_NAMES + raise ValueError( + "A shell output field must have either a callable or a path_template" + ) + @attrs.define(kw_only=True) class outarg(arg, Out): @@ -228,10 +243,15 @@ def _validate_path_template(self, attribute, value): f"path_template ({value!r}) can only be provided when no default " f"({self.default!r}) is provided" ) + if value and not (is_fileset_or_union(self.type) or self.type is ty.Any): + raise ValueError( + f"path_template ({value!r}) can only be provided when type is a FileSet, " + f"or union thereof, not {self.type!r}" + ) @keep_extension.validator def _validate_keep_extension(self, attribute, value): - if value and self.path_template is not None: + if value and self.path_template is None: raise ValueError( f"keep_extension ({value!r}) can only be provided when path_template " f"is provided" @@ -345,7 +365,7 @@ def make( ShellDef, ShellOutputs, klass, arg, out, auto_attribs ) else: - if not isinstance(wrapped, str): + if not isinstance(wrapped, (str, list)): raise ValueError( f"wrapped must be a class or a string, not {wrapped!r}" ) @@ -439,8 +459,10 @@ def make( # If wrapped is provided (i.e. this is not being used as a decorator), return the # interface class if wrapped is not None: - if not isinstance(wrapped, (type, str)): - raise ValueError(f"wrapped must be a class or a string, not {wrapped!r}") + if not isinstance(wrapped, (type, str, list)): + raise ValueError( + f"wrapped must be a class, a string or a list, not {wrapped!r}" + ) return make(wrapped) return make @@ -508,10 +530,13 @@ def parse_command_line_template( else: assert outputs is None outputs = {} - parts = template.split() + if isinstance(template, list): + tokens = template + else: + tokens = template.split() executable = [] start_args_index = 0 - for part in parts: + for part in tokens: if part.startswith("<") or part.startswith("-"): break executable.append(part) @@ -520,11 +545,10 @@ def parse_command_line_template( raise ValueError(f"Found no executable in command line template: {template}") if len(executable) == 1: executable = executable[0] - args_str = " ".join(parts[start_args_index:]) - if not args_str: + tokens = tokens[start_args_index:] + if not tokens: return executable, inputs, outputs - tokens = re.split(r"\s+", args_str.strip()) - arg_pattern = r"<([:a-zA-Z0-9_,\|\-\.\/\+\*]+(?:\?|=[^>]+)?)>" + arg_pattern = r"<([:a-zA-Z0-9_,\|\-\.\/\+\*]+(?:\?|(?:=|\$)[^>]+)?)>" opt_pattern = r"--?[a-zA-Z0-9_]+" arg_re = re.compile(arg_pattern) opt_re = re.compile(opt_pattern) @@ -618,7 +642,16 @@ def from_type_str(type_str) -> type: kwds["default"] = attrs.Factory(list) elif "=" in name: name, default = name.split("=") - kwds["default"] = eval(default) + kwds["default"] = ( + default[1:-1] if re.match(r"('|\").*\1", default) else eval(default) + ) + elif "$" in name: + name, path_template = name.split("$") + kwds["path_template"] = path_template + if field_type is not outarg: + raise ValueError( + f"Path templates can only be used with output fields, not {token}" + ) if ":" in name: name, type_str = name.split(":") type_ = from_type_str(type_str) @@ -636,7 +669,7 @@ def from_type_str(type_str) -> type: # Add field to outputs with the same name as the input add_arg(name, out, {"type": type_, "callable": _InputPassThrough(name)}) # If name contains a '.', treat it as a file template and strip it from the name - if field_type is outarg: + if field_type is outarg and "path_template" not in kwds: path_template = name if is_fileset_or_union(type_): if ty.get_origin(type_): @@ -665,7 +698,7 @@ def from_type_str(type_str) -> type: option = token else: raise ValueError( - f"Found unknown token '{token}' in command line template: {template}" + f"Found unknown token {token!r} in command line template: {template}" ) remaining_pos = remaining_positions(arguments, len(arguments) + 1, 1) diff --git a/pydra/design/tests/test_shell.py b/pydra/design/tests/test_shell.py index f4fcd64238..184c8e05e2 100644 --- a/pydra/design/tests/test_shell.py +++ b/pydra/design/tests/test_shell.py @@ -211,8 +211,8 @@ def test_interface_template_more_complex(): name="tuple_arg", argstr="--tuple-arg", type=tuple[int, str] | None, - default=None, sep=" ", + default=None, position=6, ), ShellDef.additional_args, @@ -286,11 +286,7 @@ def test_interface_template_with_overrides_and_optionals(): position=0, help=shell.EXECUTABLE_HELP_STRING, ), - shell.arg( - name="in_fs_objects", - type=MultiInputObj[FsObject], - position=1, - ), + shell.arg(name="in_fs_objects", type=MultiInputObj[FsObject], position=1), shell.arg( name="recursive", argstr="-R", @@ -362,11 +358,7 @@ def test_interface_template_with_defaults(): position=0, help=shell.EXECUTABLE_HELP_STRING, ), - shell.arg( - name="in_fs_objects", - type=MultiInputObj[FsObject], - position=1, - ), + shell.arg(name="in_fs_objects", type=MultiInputObj[FsObject], position=1), output, shell.arg(name="recursive", argstr="-R", type=bool, default=True, position=3), shell.arg( @@ -379,6 +371,7 @@ def test_interface_template_with_defaults(): type=tuple[int, str], default=(1, "bar"), position=6, + sep=" ", ), ShellDef.additional_args, ] @@ -433,11 +426,7 @@ def test_interface_template_with_type_overrides(): position=0, help=shell.EXECUTABLE_HELP_STRING, ), - shell.arg( - name="in_fs_objects", - type=MultiInputObj[FsObject], - position=1, - ), + shell.arg(name="in_fs_objects", type=MultiInputObj[FsObject], position=1), output, shell.arg(name="recursive", argstr="-R", type=bool, default=False, position=3), shell.arg(name="text_arg", argstr="--text-arg", type=str, position=4), @@ -453,6 +442,7 @@ def test_interface_template_with_type_overrides(): argstr="--tuple-arg", type=tuple[int, str], position=6, + sep=" ", ), ShellDef.additional_args, ] diff --git a/pydra/design/workflow.py b/pydra/design/workflow.py index e9d5af3ec2..119cd95918 100644 --- a/pydra/design/workflow.py +++ b/pydra/design/workflow.py @@ -2,7 +2,7 @@ import inspect from typing import dataclass_transform import attrs -from .base import ( +from pydra.design.base import ( Arg, Out, ensure_field_objects, diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 2c169b3ffc..9b7af85008 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -44,7 +44,7 @@ ) from .helpers_file import copy_nested_files, template_update from pydra.utils.messenger import AuditFlag -from pydra.engine.environments import Environment, Native +from pydra.engine.environments import Environment logger = logging.getLogger("pydra") @@ -134,7 +134,9 @@ def __init__( # We save the submitter is the definition is a workflow otherwise we don't # so the task can be pickled self.submitter = submitter - self.environment = environment if environment is not None else Native() + self.environment = ( + environment if environment is not None else submitter.environment + ) self.name = name self.state_index = state_index @@ -771,11 +773,17 @@ def add( OutputType The outputs definition of the node """ + from pydra.engine.environments import Native + if name is None: name = type(task_def).__name__ if name in self._nodes: raise ValueError(f"Node with name {name!r} already exists in the workflow") - if environment and task_def._task_type != "shell": + if ( + environment + and not isinstance(environment, Native) + and task_def._task_type != "shell" + ): raise ValueError( "Environments can only be used with 'shell' tasks not " f"{task_def._task_type!r} tasks ({task_def!r})" diff --git a/pydra/engine/environments.py b/pydra/engine/environments.py index 0cec18895a..04f90d49e1 100644 --- a/pydra/engine/environments.py +++ b/pydra/engine/environments.py @@ -10,7 +10,6 @@ if ty.TYPE_CHECKING: from pydra.engine.core import Task from pydra.engine.specs import ShellDef - from pydra.design import shell class Environment: @@ -93,8 +92,8 @@ def bind(self, loc, mode="ro"): loc_abs = Path(loc).absolute() return f"{loc_abs}:{self.root}{loc_abs}:{mode}" - def _get_bindings( - self, definition: "ShellDef", root: str | None = None + def get_bindings( + self, task: "Task", root: str | None = None ) -> tuple[dict[str, tuple[str, str]], dict[str, tuple[Path, ...]]]: """Return bindings necessary to run task in an alternative root. @@ -111,27 +110,32 @@ def _get_bindings( bindings: dict Mapping from paths in the host environment to the target environment """ + from pydra.design import shell + bindings: dict[str, tuple[str, str]] = {} input_updates: dict[str, tuple[Path, ...]] = {} if root is None: return bindings fld: shell.arg - for fld in list_fields(definition): + for fld in list_fields(task.definition): if TypeParser.contains_type(FileSet, fld.type): - fileset: FileSet | None = definition[fld.name] - if fileset is None: + fileset: FileSet | None = task.inputs[fld.name] + if not fileset: continue if not isinstance(fileset, (os.PathLike, FileSet)): raise NotImplementedError( - "Generating environment bindings for nested FileSets is not " - "supported yet" + f"No support for generating bindings for {type(fileset)} types " + f"({fileset})" ) copy = fld.copy_mode == FileSet.CopyMode.copy host_path, env_path = fileset.parent, Path(f"{root}{fileset.parent}") # Default to mounting paths as read-only, but respect existing modes - bindings[host_path] = (env_path, "rw" if copy else "ro") + bindings[host_path] = ( + env_path, + "rw" if copy or isinstance(fld, shell.outarg) else "ro", + ) # Provide updated in-container paths to the command to be run. If a # fs-object, which resolves to a single path, just pass in the name of @@ -143,6 +147,10 @@ def _get_bindings( if isinstance(fileset, os.PathLike) else tuple(env_path / rel for rel in fileset.relative_fspaths) ) + + # Add the cache directory to the list of mounts + bindings[task.cache_dir] = (f"{self.root}/{task.cache_dir}", "rw") + return bindings, input_updates @@ -152,15 +160,11 @@ class Docker(Container): def execute(self, task: "Task[ShellDef]") -> dict[str, ty.Any]: docker_img = f"{self.image}:{self.tag}" # mounting all input locations - mounts, input_updates = self._get_bindings( - definition=task.definition, root=self.root - ) + mounts, input_updates = self.get_bindings(task=task, root=self.root) docker_args = [ "docker", "run", - "-v", - self.bind(task.cache_dir, "rw"), *self.xargs, ] docker_args.extend( @@ -193,16 +197,12 @@ class Singularity(Container): def execute(self, task: "Task[ShellDef]") -> dict[str, ty.Any]: singularity_img = f"{self.image}:{self.tag}" # mounting all input locations - mounts, input_updates = self._get_bindings( - definition=task.definition, root=self.root - ) + mounts, input_updates = self.get_bindings(task=task, root=self.root) # todo adding xargsy etc singularity_args = [ "singularity", "exec", - "-B", - self.bind(task.cache_dir, "rw"), *self.xargs, ] singularity_args.extend( diff --git a/pydra/engine/helpers_file.py b/pydra/engine/helpers_file.py index f942674747..5be17047b7 100644 --- a/pydra/engine/helpers_file.py +++ b/pydra/engine/helpers_file.py @@ -12,6 +12,9 @@ from fileformats.generic import FileSet from pydra.engine.helpers import is_lazy, attrs_values, list_fields +if ty.TYPE_CHECKING: + from pydra.engine.specs import TaskDef + from pydra.design import shell logger = logging.getLogger("pydra") @@ -157,7 +160,7 @@ def template_update_single( input_values: dict[str, ty.Any] = None, output_dir: Path | None = None, spec_type: str = "input", -): +) -> Path | None: """Update a single template from the input_spec or output_spec based on the value from inputs_dict (checking the types of the fields, that have "output_file_template)" @@ -198,9 +201,9 @@ def template_update_single( if output_dir and value is not None: # should be converted to str, it is also used for input fields that should be str if type(value) is list: - return [str(output_dir / Path(val).name) for val in value] + return [output_dir / val.name for val in value] else: - return str(output_dir / Path(value).name) + return output_dir / value.name else: return None @@ -235,25 +238,38 @@ def _template_formatting(field, definition, input_values): # as default, we assume that keep_extension is True if isinstance(template, (tuple, list)): formatted = [ - _string_template_formatting(field, t, definition, input_values) + _single_template_formatting(field, t, definition, input_values) for t in template ] else: assert isinstance(template, str) - formatted = _string_template_formatting( + formatted = _single_template_formatting( field, template, definition, input_values ) return formatted -def _string_template_formatting(field, template, definition, input_values): +def _single_template_formatting( + field: "shell.outarg", + template: str, + definition: "TaskDef", + input_values: dict[str, ty.Any], +) -> Path | None: from pydra.utils.typing import MultiInputObj, MultiOutputFile inp_fields = re.findall(r"{\w+}", template) inp_fields_fl = re.findall(r"{\w+:[0-9.]+f}", template) inp_fields += [re.sub(":[0-9.]+f", "", el) for el in inp_fields_fl] + + # FIXME: This would be a better solution, and would allow you to explicitly specify + # whether you want to use the extension of the input file or not, by referencing + # the "ext" attribute of the input file. However, this would require a change in the + # way the element formatting is done + # + # inp_fields = set(re.findall(r"{(\w+)(?:\.\w+)?(?::[0-9.]+f)?}", template)) + if len(inp_fields) == 0: - return template + return Path(template) val_dict = {} file_template = None @@ -320,7 +336,7 @@ def _string_template_formatting(field, template, definition, input_values): formatted_value = _element_formatting( template, val_dict, file_template, keep_extension=field.keep_extension ) - return formatted_value + return Path(formatted_value) if formatted_value is not None else formatted_value def _element_formatting( diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 3b81cc54fa..cd7f43b4ae 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -178,6 +178,7 @@ class TaskDef(ty.Generic[OutputsType]): def __call__( self, + /, cache_dir: os.PathLike | None = None, worker: "str | ty.Type[Worker] | Worker" = "debug", environment: "Environment | None" = None, @@ -521,7 +522,7 @@ def _check_rules(self): if is_lazy(value): continue - if value is attrs.NOTHING: + if value is attrs.NOTHING and not getattr(field, "path_template", False): errors.append(f"Mandatory field {field.name!r} is not set") # Collect alternative fields associated with this field. @@ -654,6 +655,14 @@ def errors(self): return cp.load(f) return None + @property + def task(self): + task_pkl = self.output_dir / "_task.pklz" + if not task_pkl.exists(): + return None + with open(task_pkl, "rb") as f: + return cp.load(f) + @attrs.define(kw_only=True) class RuntimeSpec: @@ -937,48 +946,46 @@ def _resolve_value( if not cls._required_fields_satisfied(fld, task.definition): return None - elif isinstance(fld, shell.outarg) and fld.path_template: + if isinstance(fld, shell.outarg) and fld.path_template: return template_update_single( fld, definition=task.definition, output_dir=task.output_dir, spec_type="output", ) - elif fld.callable: - callable_ = fld.callable - if isinstance(fld.callable, staticmethod): - # In case callable is defined as a static method, - # retrieve the function wrapped in the descriptor. - callable_ = fld.callable.__func__ - call_args = inspect.getfullargspec(callable_) - call_args_val = {} - for argnm in call_args.args: - if argnm == "field": - call_args_val[argnm] = fld - elif argnm == "output_dir": - call_args_val[argnm] = task.output_dir - elif argnm == "inputs": - call_args_val[argnm] = task.inputs - elif argnm == "stdout": - call_args_val[argnm] = task.return_values["stdout"] - elif argnm == "stderr": - call_args_val[argnm] = task.return_values["stderr"] - else: - try: - call_args_val[argnm] = task.inputs[argnm] - except KeyError as e: - e.add_note( - f"arguments of the callable function from {fld.name} " - f"has to be in inputs or be field or output_dir, " - f"but {argnm} is used" - ) - raise - return callable_(**call_args_val) - else: - raise Exception( - f"Metadata for '{fld.name}', does not not contain any of the required fields " - f'("callable", "output_file_template" or "value"): {fld}.' - ) + assert fld.callable, ( + f"Output field '{fld.name}', does not not contain any of the required fields " + f'("callable", "output_file_template" or "value"): {fld}.' + ) + callable_ = fld.callable + if isinstance(fld.callable, staticmethod): + # In case callable is defined as a static method, + # retrieve the function wrapped in the descriptor. + callable_ = fld.callable.__func__ + call_args = inspect.getfullargspec(callable_) + call_args_val = {} + for argnm in call_args.args: + if argnm == "field": + call_args_val[argnm] = fld + elif argnm == "output_dir": + call_args_val[argnm] = task.output_dir + elif argnm == "inputs": + call_args_val[argnm] = task.inputs + elif argnm == "stdout": + call_args_val[argnm] = task.return_values["stdout"] + elif argnm == "stderr": + call_args_val[argnm] = task.return_values["stderr"] + else: + try: + call_args_val[argnm] = task.inputs[argnm] + except KeyError as e: + e.add_note( + f"arguments of the callable function from {fld.name} " + f"has to be in inputs or be field or output_dir, " + f"but {argnm} is used" + ) + raise + return callable_(**call_args_val) ShellOutputsType = ty.TypeVar("OutputType", bound=ShellOutputs) @@ -1035,10 +1042,9 @@ def _command_args( output_dir = Path.cwd() self._check_resolved() inputs = attrs_values(self) - modified_inputs = template_update(self, output_dir=output_dir) + inputs.update(template_update(self, output_dir=output_dir)) if input_updates: inputs.update(input_updates) - inputs.update(modified_inputs) pos_args = [] # list for (position, command arg) positions_provided = [] for field in list_fields(self): diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 02c51f4216..c9433e2d21 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -99,6 +99,8 @@ def __init__( **kwargs, ): + from pydra.engine.environments import Native + if worker is None: worker = "debug" @@ -120,7 +122,7 @@ def __init__( self.cache_dir = cache_dir self.cache_locations = cache_locations - self.environment = environment + self.environment = environment if environment is not None else Native() self.rerun = rerun self.loop = get_open_loop() self._own_loop = not self.loop.is_running() @@ -190,6 +192,8 @@ def __call__( result : Any The result of the task """ + from pydra.engine.environments import Environment + if raise_errors is None: raise_errors = self.worker_name == "debug" if not isinstance(raise_errors, bool): @@ -205,22 +209,30 @@ def __call__( output_types = {o.name: list[o.type] for o in list_fields(task_def.Outputs)} @workflow.define(outputs=output_types) - def Split(defn: TaskDef, output_types: dict): - node = workflow.add(defn) + def Split( + defn: TaskDef, output_types: dict, environment: Environment | None + ): + node = workflow.add(defn, environment=environment, hooks=hooks) return tuple(getattr(node, o) for o in output_types) - task_def = Split(defn=task_def, output_types=output_types) + task_def = Split( + defn=task_def, output_types=output_types, environment=self.environment + ) + environment = None elif task_def._combiner: raise ValueError( f"Task {self} is marked for combining, but not splitting. " "Use the `split` method to split the task before combining." ) + else: + environment = self.environment + task = Task( task_def, submitter=self, name="main", - environment=self.environment, + environment=environment, hooks=hooks, ) try: diff --git a/pydra/engine/tests/test_boutiques.py b/pydra/engine/tests/test_boutiques.py index cc5635a936..79652a6d58 100644 --- a/pydra/engine/tests/test_boutiques.py +++ b/pydra/engine/tests/test_boutiques.py @@ -3,7 +3,7 @@ import attr import pytest from pydra.engine.helpers import attrs_values -from .utils import result_no_submitter, result_submitter, no_win +from .utils import run_no_submitter, run_submitter, no_win from pydra.design import workflow, boutiques, shell need_bosh_docker = pytest.mark.skipif( @@ -22,7 +22,7 @@ @pytest.mark.parametrize( "maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"] ) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_boutiques_1(maskfile, plugin, results_function, tmpdir, data_tests_dir): """simple task to run fsl.bet using BoshTask""" btask = boutiques.define(zenodo_id="1482743") diff --git a/pydra/engine/tests/test_dockertask.py b/pydra/engine/tests/test_dockertask.py index 7d54f26805..4c98cb2a37 100644 --- a/pydra/engine/tests/test_dockertask.py +++ b/pydra/engine/tests/test_dockertask.py @@ -1,11 +1,12 @@ +import attrs import pytest -from pydra.engine.specs import ShellDef from pydra.engine.submitter import Submitter +from pydra.engine.specs import ShellDef, ShellOutputs from fileformats.generic import File from pydra.engine.environments import Docker from pydra.design import shell, workflow from pydra.engine.core import Task -from .utils import no_win, need_docker, result_submitter, result_no_submitter +from .utils import no_win, need_docker, run_submitter, run_no_submitter @no_win @@ -25,11 +26,11 @@ def test_docker_1_nosubm(): assert docky_task.environment.image == "busybox" assert docky_task.environment.tag == "latest" assert isinstance(docky_task.environment, Docker) - assert docky_task.cmdline == cmd + assert docky.cmdline == cmd - res = docky_task() - assert res.output.stdout == "root\n" - assert res.output.return_code == 0 + res = docky_task.run() + assert res.outputs.stdout == "root\n" + assert res.outputs.return_code == 0 @no_win @@ -51,8 +52,8 @@ def test_docker_1(plugin): @no_win @need_docker -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_docker_2(results_function, plugin): +@pytest.mark.parametrize("run_function", [run_no_submitter, run_submitter]) +def test_docker_2(run_function, plugin, tmp_path): """a command with arguments, cmd and args given as executable with and without submitter """ @@ -61,29 +62,28 @@ def test_docker_2(results_function, plugin): docky = Docky() # cmdline doesn't know anything about docker assert docky.cmdline == cmdline - res = results_function(docky, plugin) - assert res.output.stdout.strip() == " ".join(cmdline.split()[1:]) - assert res.output.return_code == 0 + outputs = run_function(docky, tmp_path, plugin, environment=Docker(image="busybox")) + assert outputs.stdout.strip() == " ".join(cmdline.split()[1:]) + assert outputs.return_code == 0 @no_win @need_docker -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_docker_2a(results_function, plugin): +@pytest.mark.parametrize("run_function", [run_no_submitter, run_submitter]) +def test_docker_2a(run_function, plugin, tmp_path): """a command with arguments, using executable and args using submitter """ - cmd_exec = "echo" - cmd_args = ["hail", "pydra"] + cmd = ["echo", "hail", "pydra"] # separate command into exec + args - Docky = shell.define(" ".join([cmd_exec] + cmd_args)) + Docky = shell.define(cmd) docky = Docky() - assert docky.executable == "echo" - assert docky.cmdline == f"{cmd_exec} {' '.join(cmd_args)}" + assert docky.executable == cmd + assert docky.cmdline == " ".join(cmd) - res = results_function(docky, plugin) - assert res.output.stdout.strip() == " ".join(cmd_args) - assert res.output.return_code == 0 + outputs = run_function(docky, tmp_path, plugin, environment=Docker(image="busybox")) + assert outputs.stdout.strip() == " ".join(cmd[1:]) + assert outputs.return_code == 0 # tests with State @@ -91,21 +91,22 @@ def test_docker_2a(results_function, plugin): @no_win @need_docker -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) -def test_docker_st_1(results_function, plugin): +@pytest.mark.parametrize("run_function", [run_no_submitter, run_submitter]) +def test_docker_st_1(run_function, plugin, tmp_path): """commands without arguments in container splitter = executable """ cmd = ["pwd", "whoami"] - Docky = shell.define("placeholder") + Docky = shell.define("docky") # cmd is just a placeholder docky = Docky().split(executable=cmd) - assert docky.state.splitter == "docky.executable" - - res = results_function(docky, plugin) - assert res[0].output.stdout == f"/mnt/pydra{docky.output_dir[0]}\n" - assert res[1].output.stdout == "root\n" - assert res[0].output.return_code == res[1].output.return_code == 0 + outputs = run_function(docky, tmp_path, plugin, environment=Docker(image="busybox")) + assert ( + outputs.stdout[0] + == f"/mnt/pydra{tmp_path}/{attrs.evolve(docky, executable=cmd[0])._checksum}\n" + ) + assert outputs.stdout[1] == "root\n" + assert outputs.return_code[0] == outputs.return_code[1] == 0 # tests with customized output_spec @@ -118,13 +119,11 @@ def test_docker_outputspec_1(plugin, tmp_path): customised output_spec, adding files to the output, providing specific pathname output_path is automatically added to the bindings """ - outputs = [shell.out(name="newfile", type=File, help="new file")] - docky = shell.define("touch newfile_tmp.txt", outputs=outputs)( - environment=Docker(image="ubuntu") - ) + Docky = shell.define("touch ") + docky = Docky() - res = docky(plugin=plugin) - assert res.output.stdout == "" + outputs = docky(plugin=plugin, environment=Docker(image="ubuntu")) + assert outputs.stdout == "" # tests with customised input_spec @@ -140,24 +139,23 @@ def test_docker_inputspec_1(tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - argstr="", - help="input file", - ) - ] - - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - file=filename, - strip=True, + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", + ) + ], ) - res = docky() - assert res.output.stdout == "hello from pydra" + docky = Docky(file=filename) + + outputs = docky(environment=Docker(image="busybox"), cache_dir=tmp_path) + assert outputs.stdout.strip() == "hello from pydra" @no_win @@ -172,24 +170,24 @@ def test_docker_inputspec_1a(tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - default=filename, - position=1, - argstr="", - help="input file", - ) - ] - - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - strip=True, + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + default=filename, + position=1, + argstr="", + help="input file", + ) + ], ) - res = docky() - assert res.output.stdout == "hello from pydra" + docky = Docky() + + outputs = docky(environment=Docker(image="busybox"), cache_dir=tmp_path) + assert outputs.stdout.strip() == "hello from pydra" @no_win @@ -206,32 +204,35 @@ def test_docker_inputspec_2(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file1", - type=File, - position=1, - argstr="", - help="input file 1", - ), - shell.arg( - name="file2", - type=File, - default=filename_2, - position=2, - argstr="", - help="input file 2", - ), - ] - docky = shell.define(cmd, inputs=inputs)( - name="docky", - environment=Docker(image="busybox"), + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file1", + type=File, + position=1, + argstr="", + help="input file 1", + ), + shell.arg( + name="file2", + type=File, + default=filename_2, + position=2, + argstr="", + help="input file 2", + ), + ], + ) + docky = Docky( file1=filename_1, - strip=True, ) - res = docky() - assert res.output.stdout == "hello from pydra\nhave a nice one" + outputs = docky( + name="docky", + environment=Docker(image="busybox"), + ) + assert outputs.stdout.strip() == "hello from pydra\nhave a nice one" @no_win @@ -249,33 +250,34 @@ def test_docker_inputspec_2a_except(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file1", - type=File, - default=filename_1, - position=1, - argstr="", - help="input file 1", - ), - shell.arg( - name="file2", - type=File, - position=2, - argstr="", - help="input file 2", - ), - ] + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file1", + type=File, + default=filename_1, + position=1, + argstr="", + help="input file 1", + ), + shell.arg( + name="file2", + type=File, + position=2, + argstr="", + help="input file 2", + ), + ], + ) - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), + docky = Docky( file2=filename_2, - strip=True, ) - assert docky.definition.file2.fspath == filename_2 + assert docky.file2.fspath == filename_2 - res = docky() - assert res.output.stdout == "hello from pydra\nhave a nice one" + outputs = docky(environment=Docker(image="busybox")) + assert outputs.stdout.strip() == "hello from pydra\nhave a nice one" @no_win @@ -294,32 +296,31 @@ def test_docker_inputspec_2a(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file1", - type=File, - default=filename_1, - position=1, - argstr="", - help="input file 1", - ), - shell.arg( - name="file2", - type=File, - position=2, - argstr="", - help="input file 2", - ), - ] - - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - file2=filename_2, - strip=True, + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file1", + type=File, + default=filename_1, + position=1, + argstr="", + help="input file 1", + ), + shell.arg( + name="file2", + type=File, + position=2, + argstr="", + help="input file 2", + ), + ], ) - res = docky() - assert res.output.stdout == "hello from pydra\nhave a nice one" + docky = Docky(file2=filename_2) + + outputs = docky(environment=Docker(image="busybox")) + assert outputs.stdout.strip() == "hello from pydra\nhave a nice one" @no_win @@ -350,8 +351,8 @@ def test_docker_inputspec_3(plugin, tmp_path): ) cmdline = docky.cmdline - res = docky() - assert "docker" in res.output.stdout + outputs = docky() + assert "docker" in outputs.stdout assert cmdline == docky.cmdline @@ -366,36 +367,30 @@ def test_docker_cmd_inputspec_copyfile_1(plugin, tmp_path): with open(file, "w") as f: f.write("hello from pydra\n") - cmd = ["sed", "-is", "s/hello/hi/"] - - inputs = [ - shell.arg( - name="orig_file", - type=File, + @shell.define + class Docky(ShellDef["Docky.Outputs"]): + executable = ["sed", "-is", "s/hello/hi/"] + orig_file: File = shell.arg( position=1, argstr="", help="orig file", - copyfile="copy", - ), - shell.arg( - name="out_file", - type=str, - output_file_template="{orig_file}", - help="output file", - ), - ] + copy_mode="copy", + ) - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - orig_file=str(file), - ) + class Outputs(ShellOutputs): + out_file: File = shell.outarg( + path_template="{orig_file}.txt", + help="output file", + ) - res = docky() - assert res.output.stdout == "" - out_file = res.output.out_file.fspath + docky = Docky(orig_file=str(file)) + + outputs = docky(environment=Docker(image="busybox"), cache_dir=tmp_path) + assert outputs.stdout == "" + out_file = outputs.out_file.fspath assert out_file.exists() # the file is copied, and then it is changed in place - assert out_file.parent == docky.output_dir + assert out_file.parent.parent == tmp_path with open(out_file) as f: assert "hi from pydra\n" == f.read() # the original file is unchanged @@ -418,24 +413,24 @@ def test_docker_inputspec_state_1(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - argstr="", - help="input file", - ) - ] - - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - strip=True, + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", + ) + ], ) - res = docky(split={"file": [str(filename_1), str(filename_2)]}) - assert res[0].output.stdout == "hello from pydra" - assert res[1].output.stdout == "have a nice one" + docky = Docky().split(file=[str(filename_1), str(filename_2)]) + + outputs = docky(environment=Docker(image="busybox")) + assert outputs.stdout[0].strip() == "hello from pydra" + assert outputs.stdout[1].strip() == "have a nice one" @no_win @@ -454,23 +449,23 @@ def test_docker_inputspec_state_1b(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - argstr="", - help="input file", - ) - ] - docky = shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - strip=True, + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", + ) + ], ) + docky = Docky().split(file=[str(file_1), str(file_2)]) - res = docky(split={"file": [str(file_1), str(file_2)]}) - assert res[0].output.stdout == "hello from pydra" - assert res[1].output.stdout == "have a nice one" + outputs = docky(environment=Docker(image="busybox")) + assert outputs.stdout[0].strip() == "hello from pydra" + assert outputs.stdout[1].strip() == "have a nice one" @no_win @@ -483,33 +478,33 @@ def test_docker_wf_inputspec_1(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - argstr="", - help="input file", - ) - ] + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", + ) + ], + ) @workflow.define - def Workflow(cmd, file): + def Workflow(file): docky = workflow.add( - shell.define(cmd, inputs=inputs)( - file=file, - environment=Docker(image="busybox"), - strip=True, - ) + Docky(file=file), + environment=Docker(image="busybox"), ) return docky.stdout - wf = Workflow(cmd=cmd, file=filename) + wf = Workflow(file=filename) - res = wf.result() - assert res.output.out == "hello from pydra" + outputs = wf() + assert outputs.out.strip() == "hello from pydra" @no_win @@ -525,35 +520,35 @@ def test_docker_wf_state_inputspec_1(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - argstr="", - help="input file", - ) - ] + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", + ) + ], + ) @workflow.define - def Workflow(cmd, file): + def Workflow(file): docky = workflow.add( - shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - file=file, - strip=True, - ) + Docky(file=file), + environment=Docker(image="busybox"), ) return docky.stdout - wf = Workflow(cmd=cmd) + wf = Workflow().split(file=[file_1, file_2]) - res = wf(split={"file": [file_1, file_2]}) + outputs = wf(cache_dir=tmp_path) - assert res[0].output.out == "hello from pydra" - assert res[1].output.out == "have a nice one" + assert outputs.out[0].strip() == "hello from pydra" + assert outputs.out[1].strip() == "have a nice one" @no_win @@ -569,30 +564,30 @@ def test_docker_wf_ndst_inputspec_1(plugin, tmp_path): cmd = "cat" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - argstr="", - help="input file", - ) - ] + Docky = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", + ) + ], + ) @workflow.define - def Workflow(cmd, file): + def Workflow(file): docky = workflow.add( - shell.define(cmd, inputs=inputs)( - environment=Docker(image="busybox"), - file=file, - strip=True, - ) + Docky(file=file), + environment=Docker(image="busybox"), ) return docky.stdout - wf = Workflow(cmd=cmd) + wf = Workflow().split(file=[str(file_1), str(file_2)]) - res = wf(split={"file": [str(file_1), str(file_2)]}) - assert res.output.out == ["hello from pydra", "have a nice one"] + outputs = wf(cache_dir=tmp_path) + assert outputs.out == ["hello from pydra", "have a nice one"] diff --git a/pydra/engine/tests/test_environments.py b/pydra/engine/tests/test_environments.py index fd36afaf8a..d0cbd7f63a 100644 --- a/pydra/engine/tests/test_environments.py +++ b/pydra/engine/tests/test_environments.py @@ -1,11 +1,11 @@ from pathlib import Path - -from ..environments import Native, Docker, Singularity -from ..task import ShellDef -from ..submitter import Submitter +import typing as ty +from pydra.engine.environments import Native, Docker, Singularity +from pydra.engine.submitter import Submitter from fileformats.generic import File from pydra.design import shell from pydra.engine.core import Task +from pydra.engine.task import ShellDef from pydra.engine.helpers import attrs_values from .utils import no_win, need_docker, need_singularity import pytest @@ -17,6 +17,10 @@ def makedir(path, name): return newdir +def drop_stderr(dct: dict[str, ty.Any]): + return {k: v for k, v in dct.items() if k != "stderr"} + + def test_native_1(tmp_path): """simple command, no arguments""" @@ -24,26 +28,26 @@ def newcache(x): return makedir(tmp_path, x) cmd = "whoami" - ShellDef = shell.define(cmd) - - shelly = ShellDef() + Shelly = shell.define(cmd) + shelly = Shelly() assert shelly.cmdline == cmd - shelly_task = Task( + + shelly_job = Task( definition=shelly, - submitter=Submitter(cache_dir=newcache("shelly-task")), - name="shelly", + submitter=Submitter(cache_dir=newcache("native-task")), + name="native", ) + env_outputs = Native().execute(shelly_job) - env_outputs = Native().execute(shelly_task) - outputs = shelly(cache_dir=newcache("shelly-exec")) - assert env_outputs == attrs_values(outputs) + outputs = shelly(cache_dir=newcache("native-exec")) + assert drop_stderr(env_outputs) == drop_stderr(attrs_values(outputs)) - outputs = shelly(environment=Native()) + outputs = shelly(environment=Native(), cache_dir=newcache("native-call")) assert env_outputs == attrs_values(outputs) - with Submitter(cache_dir=newcache("shelly-submitter"), environment=Native()) as sub: + with Submitter(cache_dir=newcache("native-submitter"), environment=Native()) as sub: result = sub(shelly) - assert env_outputs == attrs_values(result.outputs) + assert drop_stderr(env_outputs) == drop_stderr(attrs_values(result.outputs)) @no_win @@ -54,31 +58,31 @@ def test_docker_1(tmp_path): def newcache(x): makedir(tmp_path, x) - cmd = ["whoami"] + cmd = "whoami" docker = Docker(image="busybox") - shell_def = shell.define(cmd) - shelly = Task( - definition=shell_def, - submitter=Submitter(cache_dir=newcache("shelly")), - name="shelly", - ) - assert shell_def.cmdline == " ".join(cmd) - env_res = docker.execute(shelly) - - shelly_env = ShellDef( - name="shelly", - executable=cmd, - cache_dir=newcache("shelly_env"), - environment=docker, - ) - shelly_env() - assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + Shelly = shell.define(cmd) + shelly = Shelly() + assert shelly.cmdline == cmd - shelly_call = ShellDef( - name="shelly", executable=cmd, cache_dir=newcache("shelly_call") + shelly_job = Task( + definition=shelly, + submitter=Submitter(cache_dir=newcache("docker")), + name="docker", ) - shelly_call(environment=docker) - assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + outputs_dict = docker.execute(shelly_job) + + with Submitter(cache_dir=newcache("docker_sub"), environment=docker) as sub: + result = sub(shelly) + + outputs = shelly(environment=docker, cache_dir=newcache("docker_call")) + # If busybox isn't found locally, then the stderr will have the download progress from + # the Docker auto-pull in it + for key in ["stdout", "return_code"]: + assert ( + outputs_dict[key] + == attrs_values(outputs)[key] + == attrs_values(result.outputs)[key] + ) @no_win @@ -99,31 +103,23 @@ def newcache(x): cmd = "whoami" docker = Docker(image="busybox") - shell_def = shell.define(cmd)() - shelly = Task( - definition=shell_def, - submitter=Submitter(cache_dir=newcache("shelly")), - name="shelly", - ) - assert shell_def.cmdline == cmd - env_res = docker.execute(shelly) - - shelly_env = ShellDef( - name="shelly", - executable=cmd, - cache_dir=newcache("shelly_env"), - environment=docker, + shelly = shell.define(cmd)() + shelly_job = Task( + definition=shelly, + submitter=Submitter(cache_dir=newcache("docker")), + name="docker", ) - with Submitter(worker="cf") as sub: - shelly_env(submitter=sub) - assert env_res == shelly_env.result().output.__dict__ + assert shelly.cmdline == cmd + outputs_dict = docker.execute(shelly_job) - shelly_call = ShellDef( - name="shelly", executable=cmd, cache_dir=newcache("shelly_call") - ) - with Submitter(worker="cf") as sub: - shelly_call(submitter=sub, environment=docker) - assert env_res == shelly_call.result().output.__dict__ + with Submitter( + worker="cf", cache_dir=newcache("docker_sub"), environment=docker + ) as sub: + result = sub(shelly) + assert outputs_dict == attrs_values(result.outputs) + + outputs = shelly(cache_dir=newcache("docker_call"), environment=docker) + assert outputs_dict == attrs_values(outputs) @no_win @@ -134,31 +130,24 @@ def test_singularity_1(tmp_path): def newcache(x): makedir(tmp_path, x) - cmd = ["whoami"] - sing = Singularity(image="docker://alpine") - shell_def = shell.define(cmd) - shelly = Task( - definition=shell_def, - submitter=Submitter(cache_dir=newcache("shelly")), - name="shelly", - ) - assert shell_def.cmdline == " ".join(cmd) - env_res = sing.execute(shelly) - - shelly_env = ShellDef( - name="shelly", - executable=cmd, - cache_dir=newcache("shelly_env"), - environment=sing, + cmd = "whoami" + sing = Singularity(image="docker://alpine", xargs=["--fakeroot"]) + Shelly = shell.define(cmd) + shelly = Shelly() + shelly_job = Task( + definition=shelly, + submitter=Submitter(cache_dir=newcache("singu")), + name="singu", ) - shelly_env() - assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + assert shelly.cmdline == cmd + outputs_dict = sing.execute(shelly_job) - shelly_call = ShellDef( - name="shelly", executable=cmd, cache_dir=newcache("shelly_call") - ) - shelly_call(environment=sing) - assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + with Submitter(cache_dir=newcache("singu_sub"), environment=sing) as sub: + results = sub(shelly) + assert drop_stderr(outputs_dict) == drop_stderr(attrs_values(results.outputs)) + + outputs = shelly(environment=sing, cache_dir=newcache("singu_call")) + assert drop_stderr(outputs_dict) == drop_stderr(attrs_values(outputs)) @no_win @@ -169,87 +158,83 @@ def test_singularity_1_subm(tmp_path, plugin): def newcache(x): makedir(tmp_path, x) - cmd = ["whoami"] - sing = Singularity(image="docker://alpine") - shell_def = shell.define(cmd) - shelly = Task( - definition=shell_def, - submitter=Submitter(cache_dir=newcache("shelly")), - name="shelly", - ) - assert shell_def.cmdline == " ".join(cmd) - env_res = sing.execute(shelly) - - shelly_env = ShellDef( - name="shelly", - executable=cmd, - cache_dir=newcache("shelly_env"), - environment=sing, + cmd = "whoami" + sing = Singularity(image="docker://alpine", xargs=["--fakeroot"]) + Shelly = shell.define(cmd) + shelly = Shelly() + shelly_job = Task( + definition=shelly, + submitter=Submitter(cache_dir=newcache("singu")), + name="singu", ) - with Submitter(worker=plugin) as sub: - shelly_env(submitter=sub) - assert env_res == shelly_env.result().output.__dict__ + assert shelly.cmdline == cmd + outputs_dict = sing.execute(shelly_job) - shelly_call = ShellDef( - name="shelly", executable=cmd, cache_dir=newcache("shelly_call") - ) - with Submitter(worker=plugin) as sub: - shelly_call(submitter=sub, environment=sing) - for key in [ - "stdout", - "return_code", - ]: # singularity gives info about cashed image in stderr - assert env_res[key] == shelly_call.result().output.__dict__[key] + with Submitter( + worker=plugin, environment=sing, cache_dir=newcache("singu_sub") + ) as sub: + results = sub(shelly) + assert drop_stderr(outputs_dict) == drop_stderr(attrs_values(results.outputs)) + outputs = shelly(environment=sing, cache_dir=newcache("singu_call")) + # singularity gives info about cashed image in stderr + assert drop_stderr(outputs_dict) == drop_stderr(attrs_values(outputs)) -def create_shelly_inputfile(tempdir, filename, name, executable): - """creating a task with a simple input_spec""" - inputs = [ - shell.arg( - name="file", - type=File, - position=1, - help="files", - argstr="", - ) - ] - kwargs = {} if filename is None else {"file": filename} - shelly = shell.define( +def shelly_with_input_factory(filename, executable) -> ShellDef: + """creating a task with a simple input_spec""" + Shelly = shell.define( executable, - input=inputs, - )(**kwargs) - return shelly + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + help="files", + argstr="", + ) + ], + ) + return Shelly(**({} if filename is None else {"file": filename})) + + +def make_job(task: ShellDef, tempdir: Path, name: str): + return Task( + definition=task, + submitter=Submitter(cache_dir=makedir(tempdir, name)), + name=name, + ) def test_shell_fileinp(tmp_path): """task with a file in the command/input""" + + def newcache(x): + return makedir(tmp_path, x) + input_dir = makedir(tmp_path, "inputs") filename = input_dir / "file.txt" with open(filename, "w") as f: f.write("hello ") - shelly = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly", executable=["cat"] - ) - env_res = Native().execute(shelly) + shelly = shelly_with_input_factory(filename=filename, executable="cat") + shelly_job = make_job(shelly, tmp_path, "native") + outputs_dict = Native().execute(shelly_job) - shelly_env = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly_env", executable=["cat"] - ) - shelly_env.environment = Native() - shelly_env() - assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + with Submitter(environment=Native(), cache_dir=newcache("native_sub")) as sub: + results = sub(shelly) + assert outputs_dict == attrs_values(results.outputs) - shelly_call = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly_call", executable=["cat"] - ) - shelly_call(environment=Native()) - assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + outputs = shelly(environment=Native(), cache_dir=newcache("native_call")) + assert outputs_dict == attrs_values(outputs) def test_shell_fileinp_st(tmp_path): """task (with a splitter) with a file in the command/input""" + + def newcache(x): + return makedir(tmp_path, x) + input_dir = makedir(tmp_path, "inputs") filename_1 = input_dir / "file_1.txt" with open(filename_1, "w") as f: @@ -261,28 +246,25 @@ def test_shell_fileinp_st(tmp_path): filename = [filename_1, filename_2] - shelly_env = create_shelly_inputfile( - tempdir=tmp_path, filename=None, name="shelly_env", executable=["cat"] - ) - shelly_env.environment = Native() - shelly_env.split(file=filename) - shelly_env() - assert shelly_env.result()[0].output.stdout.strip() == "hello" - assert shelly_env.result()[1].output.stdout.strip() == "hi" - - shelly_call = create_shelly_inputfile( - tempdir=tmp_path, filename=None, name="shelly_call", executable=["cat"] + shelly = shelly_with_input_factory(filename=None, executable="cat") + with Submitter(environment=Native(), cache_dir=newcache("native")) as sub: + results = sub(shelly.split(file=filename)) + assert [s.strip() for s in results.outputs.stdout] == ["hello", "hi"] + + outputs = shelly.split(file=filename)( + environment=Native(), cache_dir=newcache("native_call") ) - shelly_call.split(file=filename) - shelly_call(environment=Native()) - assert shelly_call.result()[0].output.stdout.strip() == "hello" - assert shelly_call.result()[1].output.stdout.strip() == "hi" + assert [s.strip() for s in outputs.stdout] == ["hello", "hi"] @no_win @need_docker def test_docker_fileinp(tmp_path): """docker env: task with a file in the command/input""" + + def newcache(x): + return makedir(tmp_path, x) + docker = Docker(image="busybox") input_dir = makedir(tmp_path, "inputs") @@ -290,30 +272,26 @@ def test_docker_fileinp(tmp_path): with open(filename, "w") as f: f.write("hello ") - shelly = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly", executable=["cat"] - ) - env_res = docker.execute(shelly) + shelly = shelly_with_input_factory(filename=filename, executable="cat") + outputs_dict = docker.execute(make_job(shelly, tmp_path, "docker")) - shelly_env = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly_env", executable=["cat"] - ) - shelly_env.environment = docker - shelly_env() + with Submitter(environment=docker, cache_dir=newcache("shell_sub")) as sub: + results = sub(shelly) - assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + assert outputs_dict == attrs_values(results.outputs) - shelly_call = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly_call", executable=["cat"] - ) - shelly_call(environment=docker) - assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + outputs = shelly(environment=docker, cache_dir=newcache("docker_call")) + assert outputs_dict == attrs_values(outputs) @no_win @need_docker def test_docker_fileinp_subm(tmp_path, plugin): """docker env with a submitter: task with a file in the command/input""" + + def newcache(x): + return makedir(tmp_path, x) + docker = Docker(image="busybox") input_dir = makedir(tmp_path, "inputs") @@ -321,31 +299,30 @@ def test_docker_fileinp_subm(tmp_path, plugin): with open(filename, "w") as f: f.write("hello ") - shelly = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly", executable=["cat"] - ) - env_res = docker.execute(shelly) + shelly = shelly_with_input_factory(filename=filename, executable="cat") + shelly_job = make_job(shelly, tmp_path, "docker_job") + outputs_dict = docker.execute(shelly_job) - shelly_env = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly_env", executable=["cat"] - ) - shelly_env.environment = docker + with Submitter( + environment=docker, cache_dir=newcache("docker_sub"), worker=plugin + ) as sub: + results = sub(shelly) with Submitter(worker=plugin) as sub: - shelly_env(submitter=sub) - assert env_res == shelly_env.result().output.__dict__ + results = sub(shelly) + assert outputs_dict == attrs_values(results.outputs) - shelly_call = create_shelly_inputfile( - tempdir=tmp_path, filename=filename, name="shelly_call", executable=["cat"] - ) - with Submitter(worker=plugin) as sub: - shelly_call(submitter=sub, environment=docker) - assert env_res == shelly_call.result().output.__dict__ + outputs = shelly(environment=docker, cache_dir=newcache("docker_call")) + assert outputs_dict == attrs_values(outputs) @no_win @need_docker def test_docker_fileinp_st(tmp_path): """docker env: task (with a splitter) with a file in the command/input""" + + def newcache(x): + return makedir(tmp_path, x) + docker = Docker(image="busybox") input_dir = makedir(tmp_path, "inputs") @@ -359,54 +336,54 @@ def test_docker_fileinp_st(tmp_path): filename = [filename_1, filename_2] - shelly_env = create_shelly_inputfile( - tempdir=tmp_path, filename=None, name="shelly_env", executable=["cat"] - ) - shelly_env.environment = docker - shelly_env.split(file=filename) - shelly_env() - assert shelly_env.result()[0].output.stdout.strip() == "hello" - assert shelly_env.result()[1].output.stdout.strip() == "hi" - - shelly_call = create_shelly_inputfile( - tempdir=tmp_path, filename=None, name="shelly_call", executable=["cat"] - ) - shelly_call.split(file=filename) - shelly_call(environment=docker) - assert shelly_call.result()[0].output.stdout.strip() == "hello" - assert shelly_call.result()[1].output.stdout.strip() == "hi" + shelly = shelly_with_input_factory(filename=None, executable="cat") + with Submitter(environment=docker, cache_dir=newcache("docker_sub")) as sub: + results = sub(shelly.split(file=filename)) -def create_shelly_outputfile(tempdir, filename, name, executable="cp"): - """creating a task with an input_spec that contains a template""" - my_input_spec = [ - shell.arg( - name="file_orig", - type=File, - position=2, - help="new file", - argstr="", - ), - shell.arg( - name="file_copy", - type=str, - output_file_template="{file_orig}_copy", - help="output file", - argstr="", - ), - ] + assert [s.strip() for s in results.outputs.stdout] == ["hello", "hi"] - kwargs = {} if filename is None else {"file_orig": filename} - shelly = shell.define(executable)( - cache_dir=makedir(tempdir, name), - input_spec=my_input_spec, - **kwargs, + outputs = shelly.split(file=filename)( + environment=docker, cache_dir=newcache("docker_call") ) - return shelly + assert [s.strip() for s in outputs.stdout] == ["hello", "hi"] + + +def shelly_outputfile_factory(filename, executable="cp"): + """creating a task with an input_spec that contains a template""" + Shelly = shell.define( + executable, + inputs=[ + shell.arg( + name="file_orig", + type=File, + position=1, + help="new file", + argstr="", + ), + ], + outputs=[ + shell.outarg( + name="file_copy", + type=File, + path_template="{file_orig}_copy", + help="output file", + argstr="", + position=2, + keep_extension=True, + ), + ], + ) + + return Shelly(**({} if filename is None else {"file_orig": filename})) def test_shell_fileout(tmp_path): """task with a file in the output""" + + def newcache(x): + return Path(makedir(tmp_path, x)) + input_dir = makedir(tmp_path, "inputs") filename = input_dir / "file.txt" with open(filename, "w") as f: @@ -414,31 +391,27 @@ def test_shell_fileout(tmp_path): # execute does not create the cashedir, so this part will fail, # but I guess we don't want to use it this way anyway - # shelly = create_shelly_outputfile(tempdir=tmp_path, filename=filename, name="shelly") - # env_res = Native().execute(shelly) + # shelly = create_shelly_outputfile(tempdir=tmp_path, filename=filename, name="native") + # outputs_dict = Native().execute(shelly) - shelly_env = create_shelly_outputfile( - tempdir=tmp_path, filename=filename, name="shelly_env" - ) - shelly_env.environment = Native() - shelly_env() - assert ( - Path(shelly_env.result().output.file_copy) - == shelly_env.output_dir / "file_copy.txt" - ) + shelly = shelly_outputfile_factory(filename=filename) - shelly_call = create_shelly_outputfile( - tempdir=tmp_path, filename=filename, name="shelly_call" - ) - shelly_call(environment=Native()) - assert ( - Path(shelly_call.result().output.file_copy) - == shelly_call.output_dir / "file_copy.txt" - ) + with Submitter(environment=Native(), cache_dir=newcache("native_sub")) as sub: + result = sub(shelly) + assert Path(result.outputs.file_copy) == result.output_dir / "file_copy.txt" + + call_cache = newcache("native_call") + + outputs = shelly(environment=Native(), cache_dir=call_cache) + assert Path(outputs.file_copy) == call_cache / shelly._checksum / "file_copy.txt" def test_shell_fileout_st(tmp_path): """task (with a splitter) with a file in the output""" + + def newcache(x): + return Path(makedir(tmp_path, x)) + input_dir = makedir(tmp_path, "inputs") filename_1 = input_dir / "file_1.txt" with open(filename_1, "w") as f: @@ -450,40 +423,31 @@ def test_shell_fileout_st(tmp_path): filename = [filename_1, filename_2] - shelly_env = create_shelly_outputfile( - tempdir=tmp_path, filename=None, name="shelly_env" - ) - shelly_env.environment = Native() - shelly_env.split(file_orig=filename) - shelly_env() - assert ( - Path(shelly_env.result()[0].output.file_copy) - == shelly_env.output_dir[0] / "file_1_copy.txt" - ) - assert ( - Path(shelly_env.result()[1].output.file_copy) - == shelly_env.output_dir[1] / "file_2_copy.txt" - ) + shelly = shelly_outputfile_factory(filename=None) + with Submitter(environment=Native(), cache_dir=newcache("native")) as sub: + results = sub(shelly.split(file_orig=filename)) - shelly_call = create_shelly_outputfile( - tempdir=tmp_path, filename=None, name="shelly_call" - ) - shelly_call.split(file_orig=filename) - shelly_call(environment=Native()) - assert ( - Path(shelly_call.result()[0].output.file_copy) - == shelly_call.output_dir[0] / "file_1_copy.txt" - ) - assert ( - Path(shelly_call.result()[1].output.file_copy) - == shelly_call.output_dir[1] / "file_2_copy.txt" + assert [f.name for f in results.outputs.file_copy] == [ + "file_1_copy.txt", + "file_2_copy.txt", + ] + + call_cache = newcache("native_call") + + outputs = shelly.split(file_orig=filename)( + environment=Native(), cache_dir=call_cache ) + assert [f.name for f in outputs.file_copy] == ["file_1_copy.txt", "file_2_copy.txt"] @no_win @need_docker def test_docker_fileout(tmp_path): """docker env: task with a file in the output""" + + def newcache(x): + return Path(makedir(tmp_path, x)) + docker_env = Docker(image="busybox") input_dir = makedir(tmp_path, "inputs") @@ -491,21 +455,21 @@ def test_docker_fileout(tmp_path): with open(filename, "w") as f: f.write("hello ") - shelly_env = create_shelly_outputfile( - tempdir=tmp_path, filename=filename, name="shelly_env" - ) - shelly_env.environment = docker_env - shelly_env() - assert ( - Path(shelly_env.result().output.file_copy) - == shelly_env.output_dir / "file_copy.txt" - ) + shelly = shelly_outputfile_factory(filename=filename) + + with Submitter(environment=docker_env, cache_dir=newcache("docker")) as sub: + results = sub(shelly) + assert results.outputs.file_copy == File(results.output_dir / "file_copy.txt") @no_win @need_docker def test_docker_fileout_st(tmp_path): """docker env: task (with a splitter) with a file in the output""" + + def newcache(x): + return Path(makedir(tmp_path, x)) + docker_env = Docker(image="busybox") input_dir = makedir(tmp_path, "inputs") @@ -519,17 +483,11 @@ def test_docker_fileout_st(tmp_path): filename = [filename_1, filename_2] - shelly_env = create_shelly_outputfile( - tempdir=tmp_path, filename=None, name="shelly_env" - ) - shelly_env.environment = docker_env - shelly_env.split(file_orig=filename) - shelly_env() - assert ( - Path(shelly_env.result()[0].output.file_copy) - == shelly_env.output_dir[0] / "file_1_copy.txt" - ) - assert ( - Path(shelly_env.result()[1].output.file_copy) - == shelly_env.output_dir[1] / "file_2_copy.txt" - ) + shelly = shelly_outputfile_factory(filename=None) + + with Submitter(environment=docker_env, cache_dir=newcache("docker_sub")) as sub: + results = sub(shelly.split(file_orig=filename)) + assert [f.name for f in results.outputs.file_copy] == [ + "file_1_copy.txt", + "file_2_copy.txt", + ] diff --git a/pydra/engine/tests/test_helpers_file.py b/pydra/engine/tests/test_helpers_file.py index 65417b9efe..3b15e5bfd2 100644 --- a/pydra/engine/tests/test_helpers_file.py +++ b/pydra/engine/tests/test_helpers_file.py @@ -402,4 +402,4 @@ def test_template_formatting(tmp_path: Path): input_values=inputs_dict, output_dir=tmp_path, spec_type="input", - ) == [str(tmp_path / "file.bvec"), str(tmp_path / "file.bval")] + ) == [tmp_path / "file.bvec", tmp_path / "file.bval"] diff --git a/pydra/engine/tests/test_nipype1_convert.py b/pydra/engine/tests/test_nipype1_convert.py deleted file mode 100644 index 07af76e501..0000000000 --- a/pydra/engine/tests/test_nipype1_convert.py +++ /dev/null @@ -1,100 +0,0 @@ -import typing as ty -import pytest -from pathlib import Path -from pydra.engine.specs import ShellOutputs, ShellDef -from fileformats.generic import File -from pydra.design import shell - - -def find_txt(output_dir: Path) -> File: - files = list(output_dir.glob("*.txt")) - assert len(files) == 1 - return files[0] - - -interf_inputs = [shell.arg(name="test", type=ty.Any, help="test")] -interf_outputs = [shell.out(name="test_out", type=File, callable=find_txt)] - - -Interf_1 = shell.define("testing", inputs=interf_inputs, outputs=interf_outputs) -Interf_2 = shell.define("testing command", inputs=interf_inputs, outputs=interf_outputs) - - -@shell.define -class Interf_3(ShellDef["Interf_3.Outputs"]): - """class with customized input and executables""" - - executable = ["testing", "command"] - - in_file: str = shell.arg(help="in_file", argstr="{in_file}") - - @shell.outputs - class Outputs(ShellOutputs): - pass - - -@shell.define -class TouchInterf(ShellDef["TouchInterf.Outputs"]): - """class with customized input and executables""" - - new_file: str = shell.outarg(help="new_file", argstr="", path_template="{new_file}") - executable = "touch" - - @shell.outputs - class Outputs(ShellOutputs): - pass - - -def test_interface_specs_1(): - """testing if class input/output definition are set properly""" - task_spec = Interf_1(executable="ls") - assert task.Outputs == Interf_1.Outputs - - -def test_interface_specs_2(): - """testing if class input/output definition are overwritten properly by the user's specs""" - my_input_spec = SpecInfo( - name="Input", - fields=[("my_inp", ty.Any, {"help": "my inp"})], - bases=(ShellDef,), - ) - my_output_spec = SpecInfo( - name="Output", fields=[("my_out", File, "*.txt")], bases=(ShellOutputs,) - ) - task = Interf_1(input_spec=my_input_spec, output_spec=my_output_spec) - assert task.input_spec == my_input_spec - assert task.output_spec == my_output_spec - - -def test_interface_executable_1(): - """testing if the class executable is properly set and used in the command line""" - task = Interf_2() - assert task.executable == "testing command" - assert task.definition.executable == "testing command" - assert task.cmdline == "testing command" - - -def test_interface_executable_2(): - """testing if the class executable is overwritten by the user's input (and if the warning is raised)""" - # warning that the user changes the executable from the one that is set as a class attribute - with pytest.warns(UserWarning, match="changing the executable"): - task = Interf_2(executable="i want a different command") - assert task.executable == "testing command" - # task.executable stays the same, but input.executable is changed, so the cmd is changed - assert task.definition.executable == "i want a different command" - assert task.cmdline == "i want a different command" - - -def test_interface_cmdline_with_spaces(): - task = Interf_3(in_file="/path/to/file/with spaces") - assert task.executable == "testing command" - assert task.definition.executable == "testing command" - assert task.cmdline == "testing command '/path/to/file/with spaces'" - - -def test_interface_run_1(): - """testing execution of a simple interf with customized input and executable""" - task = TouchInterf(new_file="hello.txt") - assert task.cmdline == "touch hello.txt" - res = task() - assert res.output.new_file.fspath.exists() diff --git a/pydra/engine/tests/test_node_task.py b/pydra/engine/tests/test_node_task.py index 85c67eb8dd..51316324fd 100644 --- a/pydra/engine/tests/test_node_task.py +++ b/pydra/engine/tests/test_node_task.py @@ -77,7 +77,7 @@ def test_task_init_3( if input_type == "array": a_in = np.array(a_in) - nn = FunAddTwo(name="NA").split(splitter=splitter, a=a_in) + nn = FunAddTwo().split(splitter=splitter, a=a_in) assert np.allclose(nn.inputs.a, [3, 5]) assert nn.state.splitter == state_splitter diff --git a/pydra/engine/tests/test_numpy_examples.py b/pydra/engine/tests/test_numpy_examples.py index cec176deaf..08b3907081 100644 --- a/pydra/engine/tests/test_numpy_examples.py +++ b/pydra/engine/tests/test_numpy_examples.py @@ -16,44 +16,41 @@ @python.define(outputs=["b"]) -def arrayout(val): +def ArrayOut(val): return np.array([val, val]) def test_multiout(tmpdir): """testing a simple function that returns a numpy array""" - wf = Workflow("wf", input_spec=["val"], val=2) - wf.add(arrayout(name="mo", val=wf.lzin.val)) - wf.set_output([("array", wf.mo.lzout.b)]) - wf.cache_dir = tmpdir + @workflow.define(outputs=["array"]) + def Workflow(val): + mo = workflow.add(ArrayOut(val=val)) + return mo.b - with Submitter(worker="cf", n_procs=2) as sub: - sub(runnable=wf) + wf = Workflow(val=2) - results = wf.result(return_inputs=True) + with Submitter(worker="cf", cache_dir=tmpdir, n_procs=2) as sub: + results = sub(wf) - assert results[0] == {"wf.val": 2} - assert np.array_equal(results[1].output.array, np.array([2, 2])) + assert np.array_equal(results.outputs.array, np.array([2, 2])) def test_multiout_st(tmpdir): """testing a simple function that returns a numpy array, adding splitter""" - wf = Workflow("wf", input_spec=["val"], val=[0, 1, 2]) - wf.add(arrayout(name="mo")) - wf.mo.split("val", val=wf.lzin.val).combine("val") - wf.set_output([("array", wf.mo.lzout.b)]) - wf.cache_dir = tmpdir + @workflow.define(outputs=["array"]) + def Workflow(values): + mo = workflow.add(ArrayOut().split(val=values).combine("val")) + return mo.b - with Submitter(worker="cf", n_procs=2) as sub: - sub(runnable=wf) + wf = Workflow(values=[0, 1, 2]) - results = wf.result(return_inputs=True) + with Submitter(worker="cf", cache_dir=tmpdir, n_procs=2) as sub: + results = sub(wf) - assert results[0] == {"wf.val": [0, 1, 2]} for el in range(3): - assert np.array_equal(results[1].output.array[el], np.array([el, el])) + assert np.array_equal(results.outputs.array[el], np.array([el, el])) def test_numpy_hash_1(): @@ -81,20 +78,19 @@ def test_numpy_hash_3(): def test_task_numpyinput_1(tmp_path: Path): """task with numeric numpy array as an input""" - nn = Identity(name="NA") - nn.cache_dir = tmp_path - nn.split(x=[np.array([1, 2]), np.array([3, 4])]) + nn = Identity().split(x=[np.array([1, 2]), np.array([3, 4])]) # checking the results - results = nn() - assert (results[0].output.out == np.array([1, 2])).all() - assert (results[1].output.out == np.array([3, 4])).all() + outputs = nn(cache_dir=tmp_path) + assert (outputs.out[0] == np.array([1, 2])).all() + assert (outputs.out[1] == np.array([3, 4])).all() def test_task_numpyinput_2(tmp_path: Path): """task with numpy array of type object as an input""" - nn = Identity(name="NA") - nn.cache_dir = tmp_path - nn.split(x=[np.array(["VAL1"], dtype=object), np.array(["VAL2"], dtype=object)]) + nn = Identity().split( + x=[np.array(["VAL1"], dtype=object), np.array(["VAL2"], dtype=object)] + ) # checking the results - results = nn() - assert (results[0].output.out == np.array(["VAL1"], dtype=object)).all() + outputs = nn(cache_dir=tmp_path) + assert outputs.out[0] == np.array(["VAL1"], dtype=object) + assert outputs.out[1] == np.array(["VAL2"], dtype=object) diff --git a/pydra/engine/tests/test_profiles.py b/pydra/engine/tests/test_profiles.py index 19de274318..b8dbcaabe5 100644 --- a/pydra/engine/tests/test_profiles.py +++ b/pydra/engine/tests/test_profiles.py @@ -1,28 +1,29 @@ from ..helpers import load_task -from pydra.design import python +from pydra.design import python, workflow +from pydra.engine.core import Task +from pydra.engine.submitter import Submitter import numpy as np from pympler import asizeof from pytest import approx -def generate_list(l): - return np.arange(l).tolist() +def generate_list(n): + return np.arange(n).tolist() @python.define -def show_var(a): +def ShowVar(a): return a def create_wf(size): - wf = Workflow(name="wf", input_spec=["x"]) - wf.split("x", x=generate_list(size)) - wf.add(show_var(name="show", a=wf.lzin.x)) - wf.set_output([("out", wf.show.lzout.out)]) - wf.state.prepare_states(wf.inputs) - wf.state.prepare_inputs() - return wf + @workflow.define + def Workflow(x): + show = workflow.add(ShowVar(a=x)) + return show.out + + return Workflow().split(x=generate_list(size)) def test_wf_memory(): @@ -30,35 +31,15 @@ def test_wf_memory(): testings if the size of workflow grows linearly """ - wf_1000 = create_wf(size=1000) - wf_1000_mem = asizeof.asizeof(wf_1000) + wf_10000 = create_wf(size=10000) + wf_10000_mem = asizeof.asizeof(wf_10000) - wf_2000 = create_wf(size=2000) - wf_2000_mem = asizeof.asizeof(wf_2000) + wf_20000 = create_wf(size=20000) + wf_20000_mem = asizeof.asizeof(wf_20000) - wf_4000 = create_wf(size=4000) - wf_4000_mem = asizeof.asizeof(wf_4000) + wf_40000 = create_wf(size=40000) + wf_40000_mem = asizeof.asizeof(wf_40000) # checking if it's linear with the size of the splitter # check print(asizeof.asized(wf_4000, detail=2).format()) in case of problems - assert wf_4000_mem / wf_2000_mem == approx(2, 0.05) - assert wf_2000_mem / wf_1000_mem == approx(2, 0.05) - - -def test_load_task_memory(): - """creating two workflow with relatively big splitter: 1000 and 4000 elements - testings if load_task for a single element returns tasks of a similar size - """ - - wf_1000 = create_wf(size=1000) - wf_1000_pkl = wf_1000.pickle_task() - wf_1000_loaded = load_task(task_pkl=wf_1000_pkl, ind=1) - wf_1000_single_mem = asizeof.asizeof(wf_1000_loaded) - - wf_4000 = create_wf(size=4000) - wf_4000_pkl = wf_4000.pickle_task() - wf_4000_loaded = load_task(task_pkl=wf_4000_pkl, ind=1) - wf_4000_single_mem = asizeof.asizeof(wf_4000_loaded) - - # checking if it doesn't change with size of the splitter - # check print(asizeof.asized(wf_4000_loaded, detail=2).format()) in case of problems - assert wf_1000_single_mem / wf_4000_single_mem == approx(1, 0.05) + assert wf_40000_mem / wf_20000_mem == approx(2, 0.05) + assert wf_20000_mem / wf_10000_mem == approx(2, 0.05) diff --git a/pydra/engine/tests/test_shelltask.py b/pydra/engine/tests/test_shelltask.py index d32d5d32ff..17dec45a0f 100644 --- a/pydra/engine/tests/test_shelltask.py +++ b/pydra/engine/tests/test_shelltask.py @@ -21,14 +21,14 @@ MultiOutputFile, MultiInputObj, ) -from .utils import result_no_submitter, result_submitter, no_win +from .utils import run_no_submitter, run_submitter, no_win if sys.platform.startswith("win"): pytest.skip("SLURM not available in windows", allow_module_level=True) @pytest.mark.flaky(reruns=2) # when dask -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_1(plugin_dask_opt, results_function, tmp_path): """simple command, no arguments""" cmd = ["pwd"] @@ -41,7 +41,7 @@ def test_shell_cmd_1(plugin_dask_opt, results_function, tmp_path): assert res.output.stderr == "" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_1_strip(plugin, results_function, tmp_path): """simple command, no arguments strip option to remove \n at the end os stdout @@ -57,7 +57,7 @@ def test_shell_cmd_1_strip(plugin, results_function, tmp_path): assert res.output.stderr == "" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_2(plugin, results_function, tmp_path): """a command with arguments, cmd and args given as executable""" cmd = ["echo", "hail", "pydra"] @@ -71,7 +71,7 @@ def test_shell_cmd_2(plugin, results_function, tmp_path): assert res.output.stderr == "" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_2a(plugin, results_function, tmp_path): """a command with arguments, using executable and args""" cmd_exec = "echo" @@ -88,7 +88,7 @@ def test_shell_cmd_2a(plugin, results_function, tmp_path): assert res.output.stderr == "" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_2b(plugin, results_function, tmp_path): """a command with arguments, using strings executable and args""" cmd_exec = "echo" @@ -276,7 +276,7 @@ def test_wf_shell_cmd_1(plugin, tmp_path): # customised input definition -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_1(plugin, results_function, tmp_path): """a command with executable, args and one command opt, using a customized input_spec to add the opt to the command @@ -316,7 +316,7 @@ def test_shell_cmd_inputspec_1(plugin, results_function, tmp_path): assert res.output.stdout == "hello from pydra" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_2(plugin, results_function, tmp_path): """a command with executable, args and two command options, using a customized input_spec to add the opt to the command @@ -364,7 +364,7 @@ def test_shell_cmd_inputspec_2(plugin, results_function, tmp_path): assert res.output.stdout == "HELLO from pydra" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_3(plugin, results_function, tmp_path): """mandatory field added to fields, value provided""" cmd_exec = "echo" @@ -402,7 +402,7 @@ def test_shell_cmd_inputspec_3(plugin, results_function, tmp_path): assert res.output.stdout == "HELLO\n" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_3a(plugin, results_function, tmp_path): """mandatory field added to fields, value provided using shorter syntax for input definition (no attr.ib) @@ -435,7 +435,7 @@ def test_shell_cmd_inputspec_3a(plugin, results_function, tmp_path): assert res.output.stdout == "HELLO\n" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_3b(plugin, results_function, tmp_path): """mandatory field added to fields, value provided after init""" cmd_exec = "echo" @@ -502,7 +502,7 @@ def test_shell_cmd_inputspec_3c_exception(plugin, tmp_path): assert "mandatory" in str(excinfo.value) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_3c(plugin, results_function, tmp_path): """mandatory=False, so tasks runs fine even without the value""" cmd_exec = "echo" @@ -537,7 +537,7 @@ def test_shell_cmd_inputspec_3c(plugin, results_function, tmp_path): assert res.output.stdout == "\n" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_4(plugin, results_function, tmp_path): """mandatory field added to fields, value provided""" cmd_exec = "echo" @@ -568,7 +568,7 @@ def test_shell_cmd_inputspec_4(plugin, results_function, tmp_path): assert res.output.stdout == "Hello\n" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_4a(plugin, results_function, tmp_path): """mandatory field added to fields, value provided using shorter syntax for input definition (no attr.ib) @@ -592,7 +592,7 @@ def test_shell_cmd_inputspec_4a(plugin, results_function, tmp_path): assert res.output.stdout == "Hello\n" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_4b(plugin, results_function, tmp_path): """mandatory field added to fields, value provided""" cmd_exec = "echo" @@ -683,7 +683,7 @@ def test_shell_cmd_inputspec_4d_exception(plugin): ShellDef(name="shelly", executable=cmd_exec, input_spec=my_input_spec) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_5_nosubm(plugin, results_function, tmp_path): """checking xor in metadata: task should work fine, since only one option is True""" cmd_exec = "ls" @@ -781,7 +781,7 @@ def test_shell_cmd_inputspec_5a_exception(plugin, tmp_path): assert "is mutually exclusive" in str(excinfo.value) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_6(plugin, results_function, tmp_path): """checking requires in metadata: the required field is set in the init, so the task works fine @@ -869,7 +869,7 @@ def test_shell_cmd_inputspec_6a_exception(plugin): assert "requires" in str(excinfo.value) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_6b(plugin, results_function, tmp_path): """checking requires in metadata: the required field set after the init @@ -918,7 +918,7 @@ def test_shell_cmd_inputspec_6b(plugin, results_function, tmp_path): results_function(shelly, plugin) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_7(plugin, results_function, tmp_path): """ providing output name using input_spec, @@ -961,7 +961,7 @@ def test_shell_cmd_inputspec_7(plugin, results_function, tmp_path): assert out1.name == "newfile_tmp.txt" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_7a(plugin, results_function, tmp_path): """ providing output name using input_spec, @@ -1004,7 +1004,7 @@ def test_shell_cmd_inputspec_7a(plugin, results_function, tmp_path): assert res.output.out1_changed.fspath.name == "newfile_tmp.txt" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_7b(plugin, results_function, tmp_path): """ providing new file and output name using input_spec, @@ -1049,7 +1049,7 @@ def test_shell_cmd_inputspec_7b(plugin, results_function, tmp_path): assert res.output.out1.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_7c(plugin, results_function, tmp_path): """ providing output name using input_spec, @@ -1090,7 +1090,7 @@ def test_shell_cmd_inputspec_7c(plugin, results_function, tmp_path): assert res.output.out1.fspath.name == "newfile_tmp.txt" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_8(plugin, results_function, tmp_path): """ providing new file and output name using input_spec, @@ -1147,7 +1147,7 @@ def test_shell_cmd_inputspec_8(plugin, results_function, tmp_path): assert res.output.out1.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_8a(plugin, results_function, tmp_path): """ providing new file and output name using input_spec, @@ -1204,7 +1204,7 @@ def test_shell_cmd_inputspec_8a(plugin, results_function, tmp_path): assert res.output.out1.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_9(tmp_path, plugin, results_function): """ providing output name using input_spec (output_file_template in metadata), @@ -1257,7 +1257,7 @@ def test_shell_cmd_inputspec_9(tmp_path, plugin, results_function): assert shelly.output_dir == res.output.file_copy.fspath.parent -@pytest.mark.parametrize("results_function", [result_no_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter]) def test_shell_cmd_inputspec_9a(tmp_path, plugin, results_function): """ providing output name using input_spec (output_file_template in metadata), @@ -1306,7 +1306,7 @@ def test_shell_cmd_inputspec_9a(tmp_path, plugin, results_function): assert shelly.output_dir == res.output.file_copy.fspath.parent -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_9b(tmp_path, plugin, results_function): """ providing output name using input_spec (output_file_template in metadata) @@ -1356,7 +1356,7 @@ def test_shell_cmd_inputspec_9b(tmp_path, plugin, results_function): assert res.output.file_copy.fspath.name == "file_copy" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_9c(tmp_path, plugin, results_function): """ providing output name using input_spec (output_file_template in metadata) @@ -1408,7 +1408,7 @@ def test_shell_cmd_inputspec_9c(tmp_path, plugin, results_function): assert res.output.file_copy.fspath.parent == shelly.output_dir -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_9d(tmp_path, plugin, results_function): """ providing output name explicitly by manually setting value in input_spec @@ -1462,7 +1462,7 @@ def test_shell_cmd_inputspec_9d(tmp_path, plugin, results_function): assert shelly.output_dir == res.output.file_copy.fspath.parent -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_10(plugin, results_function, tmp_path): """using input_spec, providing list of files as an input""" @@ -1605,7 +1605,7 @@ def test_shell_cmd_inputspec_11(tmp_path): assert out_file.fspath.name == "test1" or out_file.fspath.name == "test2" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_12(tmp_path: Path, plugin, results_function): """ providing output name using input_spec @@ -1708,7 +1708,7 @@ def test_shell_cmd_inputspec_with_iterable(): assert task.cmdline == "test --in1 0 1 2 --in2 bar --in2 foo" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_copyfile_1(plugin, results_function, tmp_path): """shelltask changes a file in place, adding copyfile=True to the file-input from input_spec @@ -1770,7 +1770,7 @@ def test_shell_cmd_inputspec_copyfile_1(plugin, results_function, tmp_path): assert "hello from pydra\n" == f.read() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_copyfile_1a(plugin, results_function, tmp_path): """shelltask changes a file in place, adding copyfile=False to the File-input from input_spec @@ -1850,7 +1850,7 @@ def test_shell_cmd_inputspec_copyfile_1a(plugin, results_function, tmp_path): "if we allow for this orig_file is changing, so does checksum," " and the results can't be found" ) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_copyfile_1b(plugin, results_function, tmp_path): """shelltask changes a file in place, copyfile is None for the file-input, so original filed is changed @@ -1907,7 +1907,7 @@ def test_shell_cmd_inputspec_copyfile_1b(plugin, results_function, tmp_path): assert "hi from pydra\n" == f.read() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_state_1(plugin, results_function, tmp_path): """adding state to the input from input_spec""" cmd_exec = "echo" @@ -1986,7 +1986,7 @@ def test_shell_cmd_inputspec_typeval_2(): ShellDef(executable=cmd_exec, text="hello", input_spec=my_input_spec) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_state_1a(plugin, results_function, tmp_path): """adding state to the input from input_spec using shorter syntax for input_spec (without default) @@ -2018,7 +2018,7 @@ def test_shell_cmd_inputspec_state_1a(plugin, results_function, tmp_path): assert res[1].output.stdout == "hi\n" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_state_2(plugin, results_function, tmp_path): """ adding splitter to input that is used in the output_file_tamplate @@ -2057,7 +2057,7 @@ def test_shell_cmd_inputspec_state_2(plugin, results_function, tmp_path): assert res[i].output.out1.fspath.parent == shelly.output_dir[i] -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_state_3(plugin, results_function, tmp_path): """adding state to the File-input from input_spec""" @@ -2104,7 +2104,7 @@ def test_shell_cmd_inputspec_state_3(plugin, results_function, tmp_path): assert res[1].output.stdout == "have a nice one" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmp_path): """adding state to the File-input from input_spec""" @@ -2654,7 +2654,7 @@ def test_wf_shell_cmd_ndst_1(plugin, tmp_path): # customised output definition -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_1(plugin, results_function, tmp_path): """ customised output_spec, adding files to the output, providing specific pathname @@ -2674,7 +2674,7 @@ def test_shell_cmd_outputspec_1(plugin, results_function, tmp_path): assert res.output.newfile.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_1a(plugin, results_function, tmp_path): """ customised output_spec, adding files to the output, providing specific pathname @@ -2714,7 +2714,7 @@ def test_shell_cmd_outputspec_1b_exception(plugin, tmp_path): assert "does not exist" in str(exinfo.value) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_2(plugin, results_function, tmp_path): """ customised output_spec, adding files to the output, @@ -2756,7 +2756,7 @@ def test_shell_cmd_outputspec_2a_exception(plugin, tmp_path): assert "no file matches" in str(excinfo.value) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_3(plugin, results_function, tmp_path): """ customised output_spec, adding files to the output, @@ -2779,7 +2779,7 @@ def test_shell_cmd_outputspec_3(plugin, results_function, tmp_path): assert all([file.fspath.exists() for file in res.output.newfile]) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_5(plugin, results_function, tmp_path): """ customised output_spec, adding files to the output, @@ -2818,7 +2818,7 @@ def gather_output(field, output_dir): ) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_5a(plugin, results_function, tmp_path): """ customised output_spec, adding files to the output, @@ -2874,7 +2874,7 @@ def gather_output(executable, output_dir, ble): shelly() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_5c(plugin, results_function, tmp_path): """ Customised output definition defined as a class, @@ -2904,7 +2904,7 @@ def gather_output(executable, output_dir): assert all([file.exists() for file in res.output.newfile]) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_6(plugin, results_function, tmp_path): """ providing output name by providing output_file_template @@ -2972,7 +2972,7 @@ def test_shell_cmd_outputspec_6a(): assert res.output.out1.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_7(tmp_path, plugin, results_function): """ providing output with output_file_name and using MultiOutputFile as a type. @@ -3048,7 +3048,7 @@ def test_shell_cmd_outputspec_7(tmp_path, plugin, results_function): assert file.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_7a(tmp_path, plugin, results_function): """ providing output with output_file_name and using MultiOutputFile as a type. @@ -3126,7 +3126,7 @@ def test_shell_cmd_outputspec_7a(tmp_path, plugin, results_function): assert res.output.new_files.fspath.exists() -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_8a(tmp_path, plugin, results_function): """ customised output_spec, adding int and str to the output, @@ -3214,7 +3214,7 @@ def test_shell_cmd_outputspec_8b_error(): assert "has to have a callable" in str(e.value) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_8c(tmp_path, plugin, results_function): """ customised output_spec, adding Directory to the output named by args @@ -3257,7 +3257,7 @@ def get_lowest_directory(directory_path): assert get_lowest_directory(arg_dir) == f"/dir{index+1}" -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_outputspec_8d(tmp_path, plugin, results_function): """ customised output_spec, adding Directory to the output named by input definition @@ -3325,7 +3325,7 @@ def get_lowest_directory(directory_path): ) -@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +@pytest.mark.parametrize("results_function", [run_no_submitter, run_submitter]) def test_shell_cmd_state_outputspec_1(plugin, results_function, tmp_path): """ providing output name by providing output_file_template diff --git a/pydra/engine/tests/test_singularity.py b/pydra/engine/tests/test_singularity.py index 8c21d44289..30a934e014 100644 --- a/pydra/engine/tests/test_singularity.py +++ b/pydra/engine/tests/test_singularity.py @@ -1,13 +1,11 @@ import shutil import subprocess as sp import pytest -import attr - -from ..task import ShellDef -from ..submitter import Submitter -from ..specs import ShellOutputs, ShellDef +from pydra.engine.submitter import Submitter +from pydra.engine.specs import ShellDef, ShellOutputs +from pydra.design import shell, workflow from fileformats.generic import File -from ..environments import Singularity +from pydra.engine.environments import Singularity need_docker = pytest.mark.skipif( @@ -30,19 +28,11 @@ def test_singularity_1_nosubm(tmp_path): """ cmd = "pwd" image = "docker://alpine" - singu = ShellDef( - name="singu", - executable=cmd, - environment=Singularity(image=image), - cache_dir=tmp_path, - ) - assert singu.environment.image == "docker://alpine" - assert isinstance(singu.environment, Singularity) - assert singu.cmdline == cmd - - res = singu() - assert "/mnt/pydra" in res.output.stdout - assert res.output.return_code == 0 + Singu = shell.define(cmd) + singu = Singu() + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert "/mnt/pydra" in outputs.stdout + assert outputs.return_code == 0 @need_singularity @@ -52,17 +42,16 @@ def test_singularity_2_nosubm(tmp_path): """ cmd = ["echo", "hail", "pydra"] image = "docker://alpine" - singu = ShellDef( - name="singu", - executable=cmd, + Singu = shell.define(" ".join(cmd)) + singu = Singu() + assert singu.cmdline == " ".join(cmd) + + outputs = singu( environment=Singularity(image=image), cache_dir=tmp_path, ) - assert singu.cmdline == " ".join(cmd) - - res = singu() - assert res.output.stdout.strip() == " ".join(cmd[1:]) - assert res.output.return_code == 0 + assert outputs.stdout.strip() == " ".join(cmd[1:]) + assert outputs.return_code == 0 @need_singularity @@ -72,20 +61,18 @@ def test_singularity_2(plugin, tmp_path): """ cmd = ["echo", "hail", "pydra"] image = "docker://alpine" + Singu = shell.define(" ".join(cmd)) + singu = Singu() - singu = ShellDef( - name="singu", - executable=cmd, - environment=Singularity(image=image), - cache_dir=tmp_path, - ) assert singu.cmdline == " ".join(cmd) - with Submitter(worker=plugin) as sub: - singu(submitter=sub) - res = singu.result() - assert res.output.stdout.strip() == " ".join(cmd[1:]) - assert res.output.return_code == 0 + with Submitter( + worker=plugin, environment=Singularity(image=image), cache_dir=tmp_path + ) as sub: + res = sub(singu) + assert not res.errored, "\n".join(res.errors["error message"]) + assert res.outputs.stdout.strip() == " ".join(cmd[1:]) + assert res.outputs.return_code == 0 @need_singularity @@ -97,20 +84,17 @@ def test_singularity_2a(plugin, tmp_path): cmd_args = ["hail", "pydra"] # separate command into exec + args image = "docker://alpine" - singu = ShellDef( - name="singu", - executable=cmd_exec, - args=cmd_args, - environment=Singularity(image=image), - cache_dir=tmp_path, - ) + Singu = shell.define(cmd_exec) + singu = Singu(additional_args=cmd_args) assert singu.cmdline == f"{cmd_exec} {' '.join(cmd_args)}" - with Submitter(worker=plugin) as sub: - singu(submitter=sub) - res = singu.result() - assert res.output.stdout.strip() == " ".join(cmd_args) - assert res.output.return_code == 0 + with Submitter( + worker=plugin, environment=Singularity(image=image), cache_dir=tmp_path + ) as sub: + res = sub(singu) + + assert res.outputs.stdout.strip() == " ".join(cmd_args) + assert res.outputs.return_code == 0 # tests with State @@ -121,17 +105,20 @@ def test_singularity_st_1(plugin, tmp_path): """commands without arguments in container splitter = executable """ - cmd = ["pwd", "ls"] + cmd = ["whoami", "pwd", "ls"] image = "docker://alpine" - singu = ShellDef( - name="singu", environment=Singularity(image=image), cache_dir=tmp_path - ).split("executable", executable=cmd) - assert singu.state.splitter == "singu.executable" + Singu = shell.define("dummy") + singu = Singu().split("executable", executable=cmd) - res = singu(plugin=plugin) - assert "/mnt/pydra" in res[0].output.stdout - assert res[1].output.stdout == "" - assert res[0].output.return_code == res[1].output.return_code == 0 + outputs = singu( + plugin=plugin, + environment=Singularity(image=image, xargs=["--fakeroot"]), + cache_dir=tmp_path, + ) + assert outputs.stdout[0].strip() == "root" + assert "/mnt/pydra" in outputs.stdout[1] + assert outputs.stdout[2].strip() == "_task.pklz" + assert outputs.return_code == [0, 0, 0] @need_singularity @@ -145,17 +132,16 @@ def test_singularity_st_2(tmp_path, n): """splitter over args (checking bigger splitters if slurm available)""" args_n = list(range(n)) image = "docker://alpine" - singu = ShellDef( - name="singu", - executable="echo", - environment=Singularity(image=image), - cache_dir=tmp_path, - ).split("args", args=args_n) - assert singu.state.splitter == "singu.args" - res = singu(plugin="slurm") - assert "1" in res[1].output.stdout - assert str(n - 1) in res[-1].output.stdout - assert res[0].output.return_code == res[1].output.return_code == 0 + Singu = shell.define("echo") + singu = Singu().split("args", args=args_n) + with Submitter( + plugin="slurm", environment=Singularity(image=image), cache_dir=tmp_path + ) as sub: + res = sub(singu) + + assert "1" in res.outputs.stdout[1] + assert str(n - 1) in res.outputs.stdout[-1] + assert res.outputs.return_code[0] == res.outputs.return_code[1] == 0 # tests with customized output_spec @@ -170,25 +156,20 @@ def test_singularity_outputspec_1(plugin, tmp_path): cmd = ["touch", "newfile_tmp.txt"] image = "docker://alpine" - my_output_spec = SpecInfo( - name="Output", - fields=[("newfile", File, "newfile_tmp.txt")], - bases=(ShellOutputs,), - ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - output_spec=my_output_spec, - cache_dir=tmp_path, + Singu = shell.define( + " ".join(cmd), + outputs=[ + shell.outarg(name="newfile", type=File, path_template="newfile_tmp.txt") + ], ) + singu = Singu() - with Submitter(worker=plugin) as sub: - singu(submitter=sub) + with Submitter(environment=Singularity(image=image), cache_dir=tmp_path) as sub: + res = sub(singu) - res = singu.result() - assert res.output.stdout == "" - assert res.output.newfile.fspath.exists() + assert not res.errored, "\n".join(res.errors["error message"]) + assert res.outputs.stdout == "" + assert res.outputs.newfile.fspath.exists() # tests with customised input_spec @@ -204,37 +185,23 @@ def test_singularity_inputspec_1(plugin, tmp_path): cmd = "cat" image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help": "input file", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - file=filename, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, - ) + singu = Singu(file=filename) - res = singu() - assert res.output.stdout == "hello from pydra" + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout.strip() == "hello from pydra" @need_singularity @@ -249,32 +216,23 @@ def test_singularity_inputspec_1a(plugin, tmp_path): cmd = "cat" image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - default=filename, - metadata={"position": 1, "argstr": "", "help": "input file"}, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + default=filename, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), - ) - - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, ) + singu = Singu(file=filename) - res = singu() - assert res.output.stdout == "hello from pydra" + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout.strip() == "hello from pydra" @need_singularity @@ -291,48 +249,31 @@ def test_singularity_inputspec_2(plugin, tmp_path): cmd = "cat" image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file1", - attr.ib( - type=File, - metadata={ - "position": 1, - "argstr": "", - "help": "input file 1", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file1", + type=File, + position=1, + argstr="", + help="input file 1", ), - ( - "file2", - attr.ib( - type=File, - default=filename_2, - metadata={ - "position": 2, - "argstr": "", - "help": "input file 2", - }, - ), + shell.arg( + name="file2", + type=File, + default=filename_2, + position=2, + argstr="", + help="input file 2", ), ], - bases=(ShellDef,), ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - file1=filename_1, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, - ) + singu = Singu(file1=filename_1) - res = singu() - assert res.output.stdout == "hello from pydra\nhave a nice one" + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout == "hello from pydra\nhave a nice one" @need_singularity @@ -351,47 +292,30 @@ def test_singularity_inputspec_2a_except(plugin, tmp_path): image = "docker://alpine" # the field with default value can't be before value without default - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file1", - attr.ib( - type=File, - default=filename_1, - metadata={ - "position": 1, - "argstr": "", - "help": "input file 1", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file1", + type=File, + default=filename_1, + position=1, + argstr="", + help="input file 1", ), - ( - "file2", - attr.ib( - type=File, - metadata={ - "position": 2, - "argstr": "", - "help": "input file 2", - }, - ), + shell.arg( + name="file2", + type=File, + position=2, + argstr="", + help="input file 2", ), ], - bases=(ShellDef,), ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - file2=filename_2, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, - ) - res = singu() - assert res.output.stdout == "hello from pydra\nhave a nice one" + singu = Singu(file2=filename_2) + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout == "hello from pydra\nhave a nice one" @need_singularity @@ -411,48 +335,31 @@ def test_singularity_inputspec_2a(plugin, tmp_path): image = "docker://alpine" # if you want set default in the first field you can use default_value in metadata - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file1", - attr.ib( - type=File, - default=filename_1, - metadata={ - "position": 1, - "argstr": "", - "help": "input file 1", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file1", + type=File, + default=filename_1, + position=1, + argstr="", + help="input file 1", ), - ( - "file2", - attr.ib( - type=File, - metadata={ - "position": 2, - "argstr": "", - "help": "input file 2", - }, - ), + shell.arg( + name="file2", + type=File, + position=2, + argstr="", + help="input file 2", ), ], - bases=(ShellDef,), ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - file2=filename_2, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, - ) + singu = Singu(file2=filename_2) - res = singu() - assert res.output.stdout == "hello from pydra\nhave a nice one" + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout == "hello from pydra\nhave a nice one" @need_singularity @@ -465,54 +372,34 @@ def test_singularity_cmd_inputspec_copyfile_1(plugin, tmp_path): with open(file, "w") as f: f.write("hello from pydra\n") - cmd = ["sed", "-is", "s/hello/hi/"] image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "orig_file", - attr.ib( - type=File, - metadata={ - "position": 1, - "argstr": "", - "help": "orig file", - "mandatory": True, - "copyfile": True, - }, - ), - ), - ( - "out_file", - attr.ib( - type=str, - metadata={ - "output_file_template": "{orig_file}", - "help": "output file", - }, - ), - ), - ], - bases=(ShellDef,), - ) + @shell.define + class Singu(ShellDef["Singu.Outputs"]): - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - input_spec=my_input_spec, - orig_file=str(file), - cache_dir=tmp_path, - ) + executable = ["sed", "-is", "s/hello/hi/"] + + orig_file: File = shell.arg( + position=1, + argstr="", + help="orig file", + copy_mode=File.CopyMode.copy, + ) - res = singu() - assert res.output.stdout == "" - assert res.output.out_file.fspath.exists() + class Outputs(ShellOutputs): + out_file: File = shell.outarg( + path_template="{orig_file}.txt", # FIXME: Shouldn't have to specify the extension + help="output file", + ) + + singu = Singu(orig_file=file) + + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout == "" + assert outputs.out_file.fspath.exists() # the file is copied, and than it is changed in place - assert res.output.out_file.fspath.parent == singu.output_dir - with open(res.output.out_file) as f: + assert outputs.out_file.fspath.parent.parent == tmp_path + with open(outputs.out_file) as f: assert "hi from pydra\n" == f.read() # the original file is unchanged with open(file) as f: @@ -535,37 +422,24 @@ def test_singularity_inputspec_state_1(tmp_path): filename = [str(filename_1), str(filename_2)] image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help": "input file", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, - ).split("file", file=filename) + singu = Singu().split("file", file=filename) - res = singu() - assert res[0].output.stdout == "hello from pydra" - assert res[1].output.stdout == "have a nice one" + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout[0].strip() == "hello from pydra" + assert outputs.stdout[1].strip() == "have a nice one" @need_singularity @@ -585,37 +459,24 @@ def test_singularity_inputspec_state_1b(plugin, tmp_path): filename = [str(file_1), str(file_2)] image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help": "input file", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), ) - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=cmd, - input_spec=my_input_spec, - strip=True, - cache_dir=tmp_path, - ).split("file", file=filename) + singu = Singu().split("file", file=filename) - res = singu() - assert res[0].output.stdout == "hello from pydra" - assert res[1].output.stdout == "have a nice one" + outputs = singu(environment=Singularity(image=image), cache_dir=tmp_path) + assert outputs.stdout[0].strip() == "hello from pydra" + assert outputs.stdout[1].strip() == "have a nice one" @need_singularity @@ -628,46 +489,30 @@ def test_singularity_wf_inputspec_1(plugin, tmp_path): cmd = "cat" image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help": "input file", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), - ) - - wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmp_path) - wf.inputs.cmd = cmd - wf.inputs.file = filename - - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=wf.lzin.cmd, - file=wf.lzin.file, - input_spec=my_input_spec, - strip=True, ) - wf.add(singu) - wf.set_output([("out", wf.singu.lzout.stdout)]) + @workflow.define + def Workflow(cmd: str, file: File) -> str: + singu = workflow.add( + Singu(executable=cmd, file=file), environment=Singularity(image=image) + ) + return singu.stdout - with Submitter(worker="serial") as sub: - wf(submitter=sub) + with Submitter(cache_dir=tmp_path) as sub: + res = sub(Workflow(cmd=cmd, file=filename)) - res = wf.result() - assert res.output.out == "hello from pydra" + assert res.outputs.out.strip() == "hello from pydra" @need_singularity @@ -684,47 +529,36 @@ def test_singularity_wf_state_inputspec_1(plugin, tmp_path): filename = [str(file_1), str(file_2)] image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help": "input file", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), ) - wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmp_path) - wf.inputs.cmd = cmd - - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=wf.lzin.cmd, - file=wf.lzin.file, - input_spec=my_input_spec, - strip=True, - ) - wf.add(singu) - wf.split("file", file=filename) + @workflow.define + def Workflow(cmd: str, file: File) -> str: + singu = workflow.add( + Singu(executable=cmd, file=file), + environment=Singularity(image=image), + ) + return singu.stdout - wf.set_output([("out", wf.singu.lzout.stdout)]) + wf = Workflow(cmd=cmd).split("file", file=filename) - with Submitter(worker=plugin) as sub: - wf(submitter=sub) + with Submitter(worker=plugin, cache_dir=tmp_path) as sub: + res = sub(wf) - res = wf.result() - assert res[0].output.out == "hello from pydra" - assert res[1].output.out == "have a nice one" + assert [o.strip() for o in res.outputs.out] == [ + "hello from pydra", + "have a nice one", + ] @need_singularity @@ -741,42 +575,33 @@ def test_singularity_wf_ndst_inputspec_1(plugin, tmp_path): filename = [str(file_1), str(file_2)] image = "docker://alpine" - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help": "input file", - }, - ), + Singu = shell.define( + cmd, + inputs=[ + shell.arg( + name="file", + type=File, + position=1, + argstr="", + help="input file", ) ], - bases=(ShellDef,), ) - wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmp_path) - wf.inputs.cmd = cmd - wf.inputs.file = filename - - singu = ShellDef( - name="singu", - environment=Singularity(image=image), - executable=wf.lzin.cmd, - input_spec=my_input_spec, - strip=True, - ).split("file", file=wf.lzin.file) - wf.add(singu) + @workflow.define + def Workflow(cmd: str, files: list[File]) -> list[str]: + singu = workflow.add( + Singu(executable=cmd).split(file=files), + environment=Singularity(image=image), + ) + return singu.stdout - wf.set_output([("out", wf.singu.lzout.stdout)]) + wf = Workflow(cmd=cmd, files=filename) - with Submitter(worker=plugin) as sub: - wf(submitter=sub) + with Submitter(worker=plugin, cache_dir=tmp_path) as sub: + res = sub(wf) - res = wf.result() - assert res.output.out == ["hello from pydra", "have a nice one"] + assert [o.strip() for o in res.outputs.out] == [ + "hello from pydra", + "have a nice one", + ] diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index 90bfeb004f..950d68b8a3 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -2,6 +2,7 @@ import typing as ty import os import attrs +from unittest.mock import Mock # from copy import deepcopy import time @@ -9,7 +10,6 @@ from ..specs import ( Runtime, Result, - ShellDef, ) from pydra.engine.lazy import ( LazyInField, @@ -19,12 +19,14 @@ from pydra.utils.typing import StateArray # from ..helpers import make_klass -from .utils import Foo +from .utils import Foo, BasicWorkflow from pydra.design import python, workflow import pytest -make_klass = lambda x: x +# @python.define +# def Foo(a: str, b: int, c: float) -> str: +# return f"{a}{b}{c}" def test_runtime(): @@ -34,22 +36,14 @@ def test_runtime(): assert hasattr(runtime, "cpu_peak_percent") -def test_result(): - result = Result() +def test_result(tmp_path): + result = Result(output_dir=tmp_path) assert hasattr(result, "runtime") - assert hasattr(result, "output") + assert hasattr(result, "outputs") assert hasattr(result, "errored") assert getattr(result, "errored") is False -def test_shellspec(): - with pytest.raises(TypeError): - definition = ShellDef() - definition = ShellDef(executable="ls") # (executable, args) - assert hasattr(definition, "executable") - assert hasattr(definition, "args") - - class NodeTesting: @attrs.define() class Input: @@ -99,20 +93,33 @@ def __init__(self): self.tn = NodeTesting() -def test_lazy_inp(): - tn = NodeTesting() - lzin = LazyIn(task=tn) +@pytest.fixture +def mock_node(): + node = Mock() + node.name = "tn" + node.definition = Foo(a="a", b=1, c=2.0) + return node + - lf = lzin.inp_a - assert lf.get_value(wf=WorkflowTesting()) == "A" +@pytest.fixture +def mock_workflow(): + mock_workflow = Mock() + mock_workflow.inputs = BasicWorkflow(x=1) + mock_workflow.outputs = BasicWorkflow.Outputs(out=attrs.NOTHING) + return mock_workflow - lf = lzin.inp_b - assert lf.get_value(wf=WorkflowTesting()) == "B" + +def test_lazy_inp(mock_workflow): + lf = LazyInField(field="a", type=int, workflow=mock_workflow) + assert lf._get_value() == "a" + + lf = LazyInField(field="b", type=str, workflow_def=mock_workflow) + assert lf._get_value() == 1 def test_lazy_out(): tn = NodeTesting() - lzout = LazyOut(task=tn) + lzout = LazyOutField(task=tn) lf = lzout.out_a assert lf.get_value(wf=WorkflowTesting()) == "OUT_A" @@ -364,10 +371,10 @@ def f(x: ty.List[int]) -> ty.List[int]: def test_lazy_field_multi_diff_split(): @python.define - def f(x: ty.Any, y: ty.Any) -> ty.Any: + def F(x: ty.Any, y: ty.Any) -> ty.Any: return x - task = f(x=[1, 2, 3], name="foo") + task = F(x=[1, 2, 3], name="foo") lf = task.lzout.out.split("foo.x") @@ -383,18 +390,22 @@ def f(x: ty.Any, y: ty.Any) -> ty.Any: assert lf3.splits == set([(("foo.x",),), (("foo.y",),)]) -def test_wf_lzin_split(): +def test_wf_lzin_split(tmp_path): @python.define def identity(x: int) -> int: return x - inner = Workflow(name="inner", input_spec=["x"]) - inner.add(identity(x=inner.lzin.x, name="f")) - inner.set_output(("out", inner.f.lzout.out)) + @workflow.define + def Inner(x): + ident = workflow.add(identity(x=x)) + return ident.out + + @workflow.define + def Outer(xs): + inner = workflow.add(Inner().split(x=xs)) + return inner.out - outer = Workflow(name="outer", input_spec=["x"]) - outer.add(inner.split(x=outer.lzin.x)) - outer.set_output(("out", outer.inner.lzout.out)) + outer = Outer(xs=[1, 2, 3]) - outputs = outer(x=[1, 2, 3]) - assert outputs.out == StateArray([1, 2, 3]) + outputs = outer(cache_dir=tmp_path) + assert outputs.out == [1, 2, 3] diff --git a/pydra/engine/tests/utils.py b/pydra/engine/tests/utils.py index ff6a273bbf..9fc1d5f91f 100644 --- a/pydra/engine/tests/utils.py +++ b/pydra/engine/tests/utils.py @@ -13,6 +13,9 @@ from ..submitter import Submitter from pydra.design import workflow, python +if ty.TYPE_CHECKING: + from pydra.engine.environments import Environment + need_docker = pytest.mark.skipif( shutil.which("docker") is None or sp.call(["docker", "info"]), @@ -35,17 +38,28 @@ ) -def result_no_submitter(shell_def: ShellDef, plugin: str = None): +def run_no_submitter( + shell_def: ShellDef, + cache_dir: Path | None = None, + plugin: str | None = None, + environment: "Environment | None" = None, +): """helper function to return result when running without submitter""" - return shell_def(worker=plugin) + return shell_def(worker=plugin, cache_dir=cache_dir, environment=environment) -def result_submitter(shell_def: ShellDef, plugin: str): +def run_submitter( + shell_def: ShellDef, + cache_dir: Path | None = None, + plugin: str | None = None, + environment: "Environment | None" = None, +): """helper function to return result when running with submitter with specific plugin """ - with Submitter(worker=plugin) as sub: - return sub(shell_def) + with Submitter(worker=plugin, cache_dir=cache_dir, environment=environment) as sub: + results = sub(shell_def) + return results.outputs dot_check = sp.run(["which", "dot"], stdout=sp.PIPE, stderr=sp.PIPE) diff --git a/pydra/utils/typing.py b/pydra/utils/typing.py index cb4e46311c..7e39fc541b 100644 --- a/pydra/utils/typing.py +++ b/pydra/utils/typing.py @@ -1058,10 +1058,29 @@ def optional_type(type_: type) -> type: return type_ -def is_fileset_or_union(type_: type) -> bool: - """Check if the type is a FileSet or a Union containing a FileSet""" +def is_fileset_or_union(type_: type, allow_none: bool | None = None) -> bool: + """Check if the type is a FileSet or a Union containing a FileSet + + Parameters + ---------- + type_ : type + the type to check + allow_none : bool, optional + whether to allow None as a valid type, by default None. If None, then None + is not allowed at the outer layer, but is allowed within a Union + + Returns + ------- + is_fileset : bool + whether the type is a FileSet or a Union containing a FileSet + """ + if type_ is None and allow_none: + return True if is_union(type_): - return any(is_fileset_or_union(t) for t in ty.get_args(type_)) + return any( + is_fileset_or_union(t, allow_none=allow_none or allow_none is None) + for t in ty.get_args(type_) + ) elif not inspect.isclass(type_): return False return issubclass(type_, core.FileSet) diff --git a/pyproject.toml b/pyproject.toml index a7b7de2e35..868a488d58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,7 @@ test = [ "pytest-rerunfailures", "pytest-timeout", "codecov", + "fileformats-extras >=0.15a4", "numpy", "pyld", "psutil",