Skip to content

Adding BoshTask #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8293c71
Adding BoshTask - Shell Command Task that uses boutiques descriptor …
djarecka Apr 9, 2020
7576fdc
fixing bindings for input; temp. default for output fields
djarecka Apr 16, 2020
00c668d
fixing output spec for bosh; changing output_file_template so it work…
djarecka Apr 22, 2020
8f2e841
small changes in bosh input/output specs; adding tests with workflows
djarecka Apr 22, 2020
0073e15
Merge branch 'master' into enh/bosh
djarecka Apr 22, 2020
bd7b312
adding shell task to the bosh wf
djarecka Apr 22, 2020
f9bf7ac
skipping bosh for windows
djarecka Apr 22, 2020
7b88604
adding a simple retry for pulling zenodo file
djarecka Apr 22, 2020
07d61fc
testing json decoder error
djarecka Apr 27, 2020
2549f61
testing json decoder error
djarecka Apr 27, 2020
73fa5ca
adding retry logic to the json.load of zenodo file (randomly one test…
djarecka Apr 27, 2020
1fa629d
adding retries number
djarecka Apr 27, 2020
2a2c3d8
fix
djarecka Apr 27, 2020
8bdeed1
changing the way how the zenodo file is downloaded
djarecka Apr 28, 2020
46ad553
removing run_task from boshtask, small edit to run_task from shellcom…
djarecka Apr 28, 2020
f171828
using input/output_spec_names instead of input/output_spec for BoshTa…
djarecka Apr 28, 2020
53bd585
checking if retry logic still needed with travis
djarecka Apr 30, 2020
d1c386d
removing retry block, looks like travis is fine after changing downlo…
djarecka Apr 30, 2020
7a7b1b3
adding pytest-rerunfailures and reruning the first boutique test (fai…
djarecka May 3, 2020
65cdd56
Merge branch 'master' into enh/bosh
djarecka May 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions pydra/engine/boutiques.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import typing as ty
import json
import attr
import subprocess as sp
import os
from pathlib import Path
from functools import reduce

from ..utils.messenger import AuditFlag
from ..engine import ShellCommandTask
from ..engine.specs import (
SpecInfo,
ShellSpec,
ShellOutSpec,
File,
Directory,
attr_fields,
)
from .helpers import ensure_list, execute
from .helpers_file import template_update, is_local_file


class BoshTask(ShellCommandTask):
"""Shell Command Task based on the Boutiques descriptor"""

def __init__(
self,
zenodo=None,
bosh_file=None,
audit_flags: AuditFlag = AuditFlag.NONE,
cache_dir=None,
input_spec: ty.Optional[SpecInfo] = None,
messenger_args=None,
messengers=None,
name=None,
output_spec: ty.Optional[SpecInfo] = None,
rerun=False,
strip=False,
**kwargs,
):
"""
Initialize this task.

Parameters
----------
zenodo: :obj: str
Zenodo ID
bosh_file : : str
json file with the boutiques descriptors
audit_flags : :obj:`pydra.utils.messenger.AuditFlag`
Auditing configuration
cache_dir : :obj:`os.pathlike`
Cache directory
input_spec : :obj:`pydra.engine.specs.SpecInfo`
Specification of inputs.
messenger_args :
TODO
messengers :
TODO
name : :obj:`str`
Name of this task.
output_spec : :obj:`pydra.engine.specs.BaseSpec`
Specification of inputs.
strip : :obj:`bool`
TODO

"""
if (bosh_file and zenodo) or not (bosh_file or zenodo):
raise Exception("either bosh or zenodo has to be specified")
elif zenodo:
bosh_file = self._download_spec(zenodo)

# retry logic - an error on travis is raised randomly, not able to reproduce
tries, tries_max = 0, 7
while tries < tries_max:
try:
with bosh_file.open() as f:
self.bosh_spec = json.load(f)
break
except json.decoder.JSONDecodeError:
tries += 1
if tries == tries_max:
raise

if input_spec is None:
input_spec = self._prepare_input_spec()
self.input_spec = input_spec
if output_spec is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that both for input_spec and output_spec there is no check that these align with the bosh spec if they are provided.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, haven't even tested it...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decided to introduce input/output_spec_names instead of input/output_spec, so you can provide subset of names that should be used, but type, help_string, etc. is taken from the zenodo spec file. Otherwise I wasn't sure how to deal with conflicts in types, help_strings etc.

output_spec = self._prepare_output_spec()
self.output_spec = output_spec
self.bindings = []

super(BoshTask, self).__init__(
name=name,
input_spec=input_spec,
output_spec=output_spec,
executable=["bosh", "exec", "launch", str(bosh_file)],
args=["-s"],
audit_flags=audit_flags,
messengers=messengers,
messenger_args=messenger_args,
cache_dir=cache_dir,
strip=strip,
rerun=rerun,
**kwargs,
)
self.strip = strip

def _download_spec(self, zenodo):
""" usind bosh pull to download the zenodo file"""
spec_file = (
Path(os.environ["HOME"])
/ ".cache/boutiques/production"
/ (zenodo.replace(".", "-") + ".json")
)
for i in range(3):
if not spec_file.exists():
sp.run(["bosh", "pull", zenodo])
if not spec_file.exists():
raise Exception(f"can't pull zenodo file {zenodo}")
return spec_file

def _prepare_input_spec(self):
""" creating input spec from the zenodo file"""
binputs = self.bosh_spec["inputs"]
self._input_spec_keys = {}
fields = []
for input in binputs:
name = input["id"]
if input["type"] == "File":
tp = File
elif input["type"] == "String":
tp = str
elif input["type"] == "Number":
tp = float
elif input["type"] == "Flag":
tp = bool
else:
tp = None
# adding list
if tp and "list" in input and input["list"]:
tp = ty.List[tp]

mdata = {
"help_string": input.get("description", None) or input["name"],
"mandatory": not input["optional"],
"argstr": input.get("command-line-flag", None),
}
fields.append((name, tp, mdata))
self._input_spec_keys[input["value-key"]] = "{" + f"{name}" + "}"

spec = SpecInfo(name="Inputs", fields=fields, bases=(ShellSpec,))
return spec

def _prepare_output_spec(self):
""" creating output spec from the zenodo file"""
boutputs = self.bosh_spec["output-files"]
fields = []
for output in boutputs:
name = output["id"]
path_template = reduce(
lambda s, r: s.replace(*r),
self._input_spec_keys.items(),
output["path-template"],
)
mdata = {
"help_string": output.get("description", None) or output["name"],
"mandatory": not output["optional"],
"output_file_template": path_template,
}
fields.append((name, attr.ib(type=File, metadata=mdata)))

spec = SpecInfo(name="Outputs", fields=fields, bases=(ShellOutSpec,))
return spec

def _command_args_single(self, state_ind, ind=None):
"""Get command line arguments for a single state"""
input_filepath = self._input_file(state_ind=state_ind, ind=ind)
cmd_list = (
self.inputs.executable + [input_filepath] + self.inputs.args + self.bindings
)
return cmd_list

def _input_file(self, state_ind, ind=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and the previous function appears to interact with state. that seems odd to me. we should not be expecting people to understand state to write a task. thus far all tasks have been somewhat independent of state, and we should keep it that way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand your comment. pydra prepares the files, this is a private function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did not understand why this function is needed and how to convey to another developer that if they were to write the boutiques task they would need this function. what is the general role of this function? and how would another developer (say a CWL developer) know that this is needed. this seems to depend on state_ind.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function creates invocation json file with inputs that is needed for bosh exec launch. If task has splitter and provides two files for example for infile input, I will create two invocation files. I'm assuming that user doesn't know that bosh requires this file, but just provides infile, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be the kind of thing that happens inside run_task? i.e. the creation of this file is only required when you have the inputs already set. this is not an input to the task itself but to bosh.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this is used by command_args property that is used in run_task

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably use more specific name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see that ShellCommandTask follows a similar pattern. conceptually does command_args need to work for tasks with State. is it used anywhere in that form?

wouldn't it be easier if these didn't have to think about state for command_args and would raise an Exception if called when the Task has state, but would return the appropriate thing when the Task is stateless.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't answer you right now. I didn't see a problem with it, but can rethink this in a different PR

Copy link
Collaborator Author

@djarecka djarecka Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and I actually do think that it's nice to have, it helps with debugging

input_json = {}
for f in attr_fields(self.inputs):
if f.name in ["executable", "args"]:
continue
if self.state and f"{self.name}.{f.name}" in state_ind:
value = getattr(self.inputs, f.name)[state_ind[f"{self.name}.{f.name}"]]
else:
value = getattr(self.inputs, f.name)
# adding to the json file if specified by the user
if value is not attr.NOTHING and value != "NOTHING":
if is_local_file(f):
value = Path(value)
self.bindings.extend(["-v", f"{value.parent}:{value.parent}:ro"])
value = str(value)

input_json[f.name] = value

filename = self.cache_dir / f"{self.name}-{ind}.json"
with open(filename, "w") as jsonfile:
json.dump(input_json, jsonfile)

return str(filename)

def _run_task(self):
self.output_ = None
args = self.command_args
if args:
# removing empty strings
args = [str(el) for el in args if el not in ["", " "]]
keys = ["return_code", "stdout", "stderr"]
values = execute(args, strip=self.strip)
self.output_ = dict(zip(keys, values))
if self.output_["return_code"]:
if self.output_["stderr"]:
raise RuntimeError(self.output_["stderr"])
else:
raise RuntimeError(self.output_["stdout"])
23 changes: 18 additions & 5 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,8 @@ def collect_additional_outputs(self, input_spec, inputs, output_dir):
if fld.type is File:
# assuming that field should have either default or metadata, but not both
if (
not (fld.default is None or fld.default == attr.NOTHING)
and fld.metadata
):
fld.default is None or fld.default == attr.NOTHING
) and not fld.metadata: # TODO: is it right?
raise Exception("File has to have default value or metadata")
elif not fld.default == attr.NOTHING:
additional_out[fld.name] = self._field_defaultvalue(
Expand Down Expand Up @@ -360,9 +359,23 @@ def _field_metadata(self, fld, inputs, output_dir):
if "value" in fld.metadata:
return output_dir / fld.metadata["value"]
elif "output_file_template" in fld.metadata:
return output_dir / fld.metadata["output_file_template"].format(
**inputs.__dict__
sfx_tmpl = (output_dir / fld.metadata["output_file_template"]).suffixes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you detect suffixes. for imaging files or other modalities this may need to be quite custom? for example nii.gz, BRIK/HEAD, etc.,.

how will pydra generalize this across domains?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my main problem was that input_1 could be filename, filename.nii or filename.nii.gz and output should be always filename.nii.gz, and the templaete is [input_1].nii.gz...
so I decided to remove all suffixes if template has it's own, happy to hear better suggestions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that specific to bosh or boutiques that output is always compressed nifti? also does bosh/boutiques enforce nifti as output? i don't think anything in boutiques enforces it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you see example for bet - you will see that "output-files" has "path-template": "[MASK].nii.gz", but maskfile (that has "value-key": "[MASK]"), could be either filename or filename.nii.gz. For both cases output-files is filename.nii.gz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that may be true of bet, but boutiques can support other tools as well. so there needs to be a general way of deciding what the outputs would be.

this is the part where knowing what outputs should be created given the inputs plays a role, but i don't think that's captured in boutiques yet. so as a first pass, you can simply leave inputs/outputs as separate things.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood that it's not always a case, but I thought this is a good start. Not sure how do you want me to keep it separately. From the zenodo file I have only a template that uses inputs values

if sfx_tmpl:
# removing suffix from input field if template has it's own suffix
inputs_templ = {
k: v.split(".")[0]
for k, v in inputs.__dict__.items()
if isinstance(v, str)
}
else:
inputs_templ = {
k: v for k, v in inputs.__dict__.items() if isinstance(v, str)
}
out_path = output_dir / fld.metadata["output_file_template"].format(
**inputs_templ
)
return out_path

elif "callable" in fld.metadata:
return fld.metadata["callable"](fld.name, output_dir)
else:
Expand Down
Binary file added pydra/engine/tests/data_tests/test.nii.gz
Binary file not shown.
130 changes: 130 additions & 0 deletions pydra/engine/tests/test_boutiques.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os, shutil
import subprocess as sp
from pathlib import Path
import pytest

from ..core import Workflow
from ..task import ShellCommandTask
from ..submitter import Submitter
from ..boutiques import BoshTask
from .utils import result_no_submitter, result_submitter, no_win

need_bosh_docker = pytest.mark.skipif(
shutil.which("docker") is None
or sp.call(["docker", "info"] or sp.call(["bosh", "version"])),
reason="requires docker and bosh",
)

if bool(shutil.which("sbatch")):
Plugins = ["cf", "slurm"]
else:
Plugins = ["cf"]

Infile = Path(__file__).resolve().parent / "data_tests" / "test.nii.gz"


@no_win
@need_bosh_docker
@pytest.mark.parametrize(
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
)
@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter])
@pytest.mark.parametrize("plugin", Plugins)
def test_boutiques_1(maskfile, plugin, results_function):
""" simple task to run fsl.bet using BoshTask"""
btask = BoshTask(name="NA", zenodo="zenodo.1482743")
btask.inputs.infile = Infile
btask.inputs.maskfile = maskfile
res = results_function(btask, plugin)

assert res.output.return_code == 0

# checking if the outfile exists and if it has proper name
assert res.output.outfile.name == "test_brain.nii.gz"
assert res.output.outfile.exists()
# other files should also have proper names, but they do not exist
assert res.output.out_outskin_off.name == "test_brain_outskin_mesh.off"
assert not res.output.out_outskin_off.exists()


@no_win
@need_bosh_docker
@pytest.mark.parametrize(
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
)
@pytest.mark.parametrize("plugin", Plugins)
def test_boutiques_wf_1(maskfile, plugin):
""" wf with one task that runs fsl.bet using BoshTask"""
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])
wf.inputs.maskfile = maskfile
wf.inputs.infile = Infile

wf.add(
BoshTask(
name="bet",
zenodo="zenodo.1482743",
infile=wf.lzin.infile,
maskfile=wf.lzin.maskfile,
)
)

wf.set_output([("outfile", wf.bet.lzout.outfile)])

with Submitter(plugin=plugin) as sub:
wf(submitter=sub)

res = wf.result()
assert res.output.outfile.name == "test_brain.nii.gz"
assert res.output.outfile.exists()


@no_win
@need_bosh_docker
@pytest.mark.parametrize(
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
)
@pytest.mark.parametrize("plugin", Plugins)
def test_boutiques_wf_2(maskfile, plugin):
""" wf with two BoshTasks (fsl.bet and fsl.stats) and one ShellTask"""
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])
wf.inputs.maskfile = maskfile
wf.inputs.infile = Infile

wf.add(
BoshTask(
name="bet",
zenodo="zenodo.1482743",
infile=wf.lzin.infile,
maskfile=wf.lzin.maskfile,
)
)
wf.add(
BoshTask(
name="stat",
zenodo="zenodo.3240521",
input_file=wf.bet.lzout.outfile,
v=True,
)
)
wf.add(ShellCommandTask(name="cat", executable="cat", args=wf.stat.lzout.output))

wf.set_output(
[
("outfile_bet", wf.bet.lzout.outfile),
("out_stat", wf.stat.lzout.output),
("out", wf.cat.lzout.stdout),
]
)

with Submitter(plugin=plugin) as sub:
wf(submitter=sub)

res = wf.result()
assert res.output.outfile_bet.name == "test_brain.nii.gz"
assert res.output.outfile_bet.exists()

assert res.output.out_stat.name == "output.txt"
assert res.output.out_stat.exists()

assert int(res.output.out.rstrip().split()[0]) == 11534336
assert float(res.output.out.rstrip().split()[1]) == 11534336.0
Loading