Skip to content

Commit 6cff659

Browse files
committed
fix: resolving conflict
2 parents d66cbc0 + 984a383 commit 6cff659

File tree

13 files changed

+1601
-176
lines changed

13 files changed

+1601
-176
lines changed

doc/users/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
select_files
3232
function_interface
3333
mapnode_and_iterables
34+
joinnode_and_itersource
3435
model_specification
3536
saving_workflows
3637
spmmcr

doc/users/joinnode_and_itersource.rst

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
.. _joinnode_and_itersource:
2+
3+
====================================
4+
JoinNode, synchronize and itersource
5+
====================================
6+
The previous :doc:`mapnode_and_iterables` chapter described how to
7+
fork and join nodes using MapNode and iterables. In this chapter, we
8+
introduce features which build on these concepts to add workflow
9+
flexibility.
10+
11+
JoinNode, joinsource and joinfield
12+
==================================
13+
14+
A :class:`nipype.pipeline.engine.JoinNode` generalizes MapNode to
15+
operate in conjunction with an upstream iterable node to reassemble
16+
downstream results, e.g.:
17+
18+
.. digraph:: joinnode_ex
19+
20+
"A" -> "B1" -> "C1" -> "D";
21+
"A" -> "B2" -> "C2" -> "D";
22+
"A" -> "B3" -> "C3" -> "D";
23+
24+
The code to achieve this is as follows:
25+
26+
::
27+
28+
import nipype.pipeline.engine as pe
29+
a = pe.Node(interface=A(), name="a")
30+
b = pe.Node(interface=B(), name="b")
31+
b.iterables = ("in_file", images)
32+
c = pe.Node(interface=C(), name="c")
33+
d = pe.JoinNode(interface=D(), joinsource="b",
34+
joinfield="in_files", name="d")
35+
36+
my_workflow = pe.Workflow(name="my_workflow")
37+
my_workflow.connect([(a,b,[('subject','subject')]),
38+
(b,c,[('out_file','in_file')])
39+
(c,d,[('out_file','in_files')])
40+
])
41+
42+
This example assumes that interface "A" has one output *subject*,
43+
interface "B" has two inputs *subject* and *in_file* and one output
44+
*out_file*, interface "C" has one input *in_file* and one output
45+
*out_file*, and interface D has one list input *in_files*. The
46+
*images* variable is a list of three input image file names.
47+
48+
As with *iterables* and the MapNode *iterfield*, the *joinfield*
49+
can be a list of fields. Thus, the declaration in the previous example
50+
is equivalent to the following:
51+
52+
::
53+
54+
d = pe.JoinNode(interface=D(), joinsource="b",
55+
joinfield=["in_files"], name="d")
56+
57+
The *joinfield* defaults to all of the JoinNode input fields, so the
58+
declaration is also equivalent to the following:
59+
60+
::
61+
62+
d = pe.JoinNode(interface=D(), joinsource="b", name="d")
63+
64+
In this example, the node "c" *out_file* outputs are collected into
65+
the JoinNode "d" *in_files* input list. The *in_files* order is the
66+
same as the upstream "b" node iterables order.
67+
68+
The JoinNode input can be filtered for unique values by specifying
69+
the *unique* flag, e.g.:
70+
71+
::
72+
73+
d = pe.JoinNode(interface=D(), joinsource="b", unique=True, name="d")
74+
75+
synchronize
76+
===========
77+
78+
The :class:`nipype.pipeline.engine.Node` *iterables* parameter can be
79+
be a single field or a list of fields. If it is a list, then execution
80+
is performed over all permutations of the list items. For example:
81+
82+
::
83+
84+
b.iterables = [("m", [1, 2]), ("n", [3, 4])]
85+
86+
results in the execution graph:
87+
88+
.. digraph:: multiple_iterables_ex
89+
90+
"A" -> "B13" -> "C";
91+
"A" -> "B14" -> "C";
92+
"A" -> "B23" -> "C";
93+
"A" -> "B24" -> "C";
94+
95+
where "B13" has inputs *m* = 1, *n* = 3, "B14" has inputs *m* = 1,
96+
*n* = 4, etc.
97+
98+
The *synchronize* parameter synchronizes the iterables lists, e.g.:
99+
100+
::
101+
102+
b.iterables = [("m", [1, 2]), ("n", [3, 4])]
103+
b.synchronize = True
104+
105+
results in the execution graph:
106+
107+
.. digraph:: synchronize_ex
108+
109+
"A" -> "B13" -> "C";
110+
"A" -> "B24" -> "C";
111+
112+
where the iterable inputs are selected in lock-step by index, i.e.:
113+
114+
(*m*, *n*) = (1, 3) and (2, 4)
115+
116+
for "B13" and "B24", resp.
117+
118+
itersource
119+
==========
120+
121+
The *itersource* feature allows you to expand a downstream iterable
122+
based on a mapping of an upstream iterable. For example:
123+
124+
::
125+
126+
a = pe.Node(interface=A(), name="a")
127+
b = pe.Node(interface=B(), name="b")
128+
b.iterables = ("m", [1, 2])
129+
c = pe.Node(interface=C(), name="c")
130+
d = pe.Node(interface=D(), name="d")
131+
d.itersource = ("b", "m")
132+
d.iterables = [("n", {1:[3,4], 2:[5,6]})]
133+
my_workflow = pe.Workflow(name="my_workflow")
134+
my_workflow.connect([(a,b,[('out_file','in_file')]),
135+
(b,c,[('out_file','in_file')])
136+
(c,d,[('out_file','in_file')])
137+
])
138+
139+
results in the execution graph:
140+
141+
.. digraph:: itersource_ex
142+
143+
"A" -> "B1" -> "C1" -> "D13";
144+
"C1" -> "D14";
145+
"A" -> "B2" -> "C2" -> "D25";
146+
"C2" -> "D26";
147+
148+
In this example, all interfaces have input *in_file* and output
149+
*out_file*. In addition, interface "B" has input *m* and interface "D"
150+
has input *n*. A Python dictionary associates the "b" node input
151+
value with the downstream "d" node *n* iterable values.
152+
153+
This example can be extended with a summary JoinNode:
154+
155+
::
156+
157+
e = pe.JoinNode(interface=E(), joinsource="d",
158+
joinfield="in_files", name="e")
159+
my_workflow.connect(d, 'out_file',
160+
e, 'in_files')
161+
162+
resulting in the graph:
163+
164+
.. digraph:: itersource_with_join_ex
165+
166+
"A" -> "B1" -> "C1" -> "D13" -> "E";
167+
"C1" -> "D14" -> "E";
168+
"A" -> "B2" -> "C2" -> "D25" -> "E";
169+
"C2" -> "D26" -> "E";
170+
171+
The combination of iterables, MapNode, JoinNode, synchronize and
172+
itersource enables the creation of arbitrarily complex workflow graphs.
173+
The astute workflow builder will recognize that this flexibility is
174+
both a blessing and a curse. These advanced features are handy additions
175+
to the Nipype toolkit when used judiciously.

nipype/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def _test_local_install():
7474
pass
7575

7676

77-
from pipeline import Node, MapNode, Workflow
77+
from pipeline import Node, MapNode, JoinNode, Workflow
7878
from interfaces import (fsl, spm, freesurfer, afni, ants, slicer, dipy, nipy,
7979
mrtrix, camino, DataGrabber, DataSink, SelectFiles,
8080
IdentityInterface, Rename, Function, Select, Merge)

nipype/interfaces/ants/registration.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ class RegistrationInputSpec(ANTSCommandInputSpec):
246246
trait=sampling_percentage_stage_trait, requires=['sampling_strategy'],
247247
desc="the metric sampling percentage(s) to use for each stage")
248248
use_estimate_learning_rate_once = traits.List(traits.Bool(), desc='')
249-
use_histogram_matching = traits.List(
250-
traits.Bool(argstr='%s'), default=True, usedefault=True)
249+
use_histogram_matching = traits.Either(traits.Bool, traits.List(traits.Bool(argstr='%s')),
250+
default=True, usedefault=True)
251251
interpolation = traits.Enum(
252252
'Linear', 'NearestNeighbor', 'CosineWindowedSinc', 'WelchWindowedSinc',
253253
'HammingWindowedSinc', 'LanczosWindowedSinc', 'BSpline',
@@ -286,7 +286,7 @@ class RegistrationInputSpec(ANTSCommandInputSpec):
286286
smoothing_sigmas = traits.List(traits.List(traits.Float()), mandatory=True)
287287
sigma_units = traits.List(traits.Enum('mm', 'vox'),
288288
requires=['smoothing_sigmas'],
289-
desc="units for smoothing sigmas", mandatory=True)
289+
desc="units for smoothing sigmas")
290290
shrink_factors = traits.List(traits.List(traits.Int()), mandatory=True)
291291
convergence_threshold = traits.List(trait=traits.Float(), value=[1e-6], minlen=1, requires=['number_of_iterations'], usedefault=True)
292292
convergence_window_size = traits.List(trait=traits.Int(), value=[10], minlen=1, requires=['convergence_threshold'], usedefault=True)
@@ -486,17 +486,26 @@ def _formatRegistration(self):
486486
for metric in self._formatMetric(ii):
487487
retval.append('--metric %s' % metric)
488488
retval.append('--convergence %s' % self._formatConvergence(ii))
489-
retval.append('--smoothing-sigmas %s%s' % (self._antsJoinList(
490-
self.inputs.smoothing_sigmas[ii]),
491-
self.inputs.sigma_units[ii]))
489+
if isdefined(self.inputs.sigma_units):
490+
retval.append('--smoothing-sigmas %s%s' %
491+
(self._antsJoinList(self.inputs.smoothing_sigmas[ii]),
492+
self.inputs.sigma_units[ii]))
493+
else:
494+
retval.append('--smoothing-sigmas %s' %
495+
self._antsJoinList(self.inputs.smoothing_sigmas[ii]))
492496
retval.append('--shrink-factors %s' %
493497
self._antsJoinList(self.inputs.shrink_factors[ii]))
494498
if isdefined(self.inputs.use_estimate_learning_rate_once):
495499
retval.append('--use-estimate-learning-rate-once %d' %
496500
self.inputs.use_estimate_learning_rate_once[ii])
497501
if isdefined(self.inputs.use_histogram_matching):
498-
retval.append('--use-histogram-matching %d' %
499-
self.inputs.use_histogram_matching[ii])
502+
# use_histogram_matching is either a common flag for all transforms
503+
# or a list of transform-specific flags
504+
if isinstance(self.inputs.use_histogram_matching, bool):
505+
histval = self.inputs.use_histogram_matching
506+
else:
507+
histval = self.inputs.use_histogram_matching[ii]
508+
retval.append('--use-histogram-matching %d' % histval)
500509
return " ".join(retval)
501510

502511
def _antsJoinList(self, antsList):
@@ -593,7 +602,6 @@ def _format_arg(self, opt, spec, val):
593602

594603
def _outputFileNames(self, prefix, count, transform, inverse=False):
595604
self.lowDimensionalTransformMap = {'Rigid': 'Rigid.mat',
596-
#seems counterontuitive, but his is how ANTS is calling it
597605
'Affine': 'Affine.mat',
598606
'GenericAffine': 'GenericAffine.mat',
599607
'CompositeAffine': 'Affine.mat',

nipype/interfaces/ants/resampling.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ class ApplyTransformsInputSpec(ANTSCommandInputSpec):
225225
output_image = traits.Str(argstr='--output %s',
226226
desc=('output file name'), genfile=True,
227227
hash_files=False)
228+
out_postfix = traits.Str("_trans", usedefault=True,
229+
desc=('Postfix that is appended to all output '
230+
'files (default = _trans)'))
228231
reference_image = File(argstr='--reference-image %s', mandatory=True,
229232
desc='reference image space that you wish to warp INTO',
230233
exists=True)
@@ -286,7 +289,7 @@ def _gen_filename(self, name):
286289
output = self.inputs.output_image
287290
if not isdefined(output):
288291
_, name, ext = split_filename(self.inputs.input_image)
289-
output = name + '_trans' + ext
292+
output = name + self.inputs.out_postfix + ext
290293
return output
291294
return None
292295

nipype/interfaces/dcmstack.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,9 @@ def _run_interface(self, runtime):
278278
src_dict = src.meta_ext.get_class_dict(cls)
279279
dest_dict = dest.meta_ext.get_class_dict(cls)
280280
dest_dict.update(src_dict)
281+
# Update the shape and slice dimension to reflect the meta extension update.
282+
dest.meta_ext.slice_dim = src.meta_ext.slice_dim
283+
dest.meta_ext.shape = src.meta_ext.shape
281284

282285
self.out_path = path.join(os.getcwd(),
283286
path.basename(self.inputs.dest_file))

nipype/interfaces/io.py

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,7 +1234,7 @@ class XNATSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
12341234
share = traits.Bool(False,
12351235
desc=('Option to share the subjects from the original project'
12361236
'instead of creating new ones when possible - the created '
1237-
'experiments are then shared backk to the original project'
1237+
'experiments are then shared back to the original project'
12381238
),
12391239
usedefault=True,
12401240
mandatory=False,
@@ -1272,20 +1272,16 @@ def _list_outputs(self):
12721272

12731273
# if possible share the subject from the original project
12741274
if self.inputs.share:
1275+
subject_id = self.inputs.subject_id
12751276
result = xnat.select(
12761277
'xnat:subjectData',
12771278
['xnat:subjectData/PROJECT',
12781279
'xnat:subjectData/SUBJECT_ID']
1279-
).where('xnat:subjectData/SUBJECT_ID = %s AND' %
1280-
self.inputs.subject_id
1281-
)
1282-
1283-
subject_id = self.inputs.subject_id
1280+
).where('xnat:subjectData/SUBJECT_ID = %s AND' % subject_id)
12841281

12851282
# subject containing raw data exists on the server
1286-
if isinstance(result.data[0], dict):
1283+
if (result.data and isinstance(result.data[0], dict)):
12871284
result = result.data[0]
1288-
12891285
shared = xnat.select('/project/%s/subject/%s' %
12901286
(self.inputs.project_id,
12911287
self.inputs.subject_id
@@ -1306,42 +1302,19 @@ def _list_outputs(self):
13061302

13071303
subject.share(str(self.inputs.project_id))
13081304

1309-
else:
1310-
# subject containing raw data does not exist on the server
1311-
subject_id = '%s_%s' % (
1312-
quote_id(self.inputs.project_id),
1313-
quote_id(self.inputs.subject_id)
1314-
)
1315-
13161305
# setup XNAT resource
1317-
uri_template_args = {
1318-
'project_id': quote_id(self.inputs.project_id),
1319-
'subject_id': subject_id,
1320-
'experiment_id': '%s_%s_%s' % (
1321-
quote_id(self.inputs.project_id),
1322-
quote_id(self.inputs.subject_id),
1323-
quote_id(self.inputs.experiment_id)
1324-
)
1325-
}
1306+
uri_template_args = dict(
1307+
project_id=quote_id(self.inputs.project_id),
1308+
subject_id=self.inputs.subject_id,
1309+
experiment_id=quote_id(self.inputs.experiment_id))
13261310

13271311
if self.inputs.share:
13281312
uri_template_args['original_project'] = result['project']
13291313

13301314
if self.inputs.assessor_id:
1331-
uri_template_args['assessor_id'] = (
1332-
'%s_%s' % (
1333-
uri_template_args['experiment_id'],
1334-
quote_id(self.inputs.assessor_id)
1335-
)
1336-
)
1337-
1315+
uri_template_args['assessor_id'] = quote_id(self.inputs.assessor_id)
13381316
elif self.inputs.reconstruction_id:
1339-
uri_template_args['reconstruction_id'] = (
1340-
'%s_%s' % (
1341-
uri_template_args['experiment_id'],
1342-
quote_id(self.inputs.reconstruction_id)
1343-
)
1344-
)
1317+
uri_template_args['reconstruction_id'] = quote_id(self.inputs.reconstruction_id)
13451318

13461319
# gather outputs and upload them
13471320
for key, files in self.inputs._outputs.items():

nipype/interfaces/utility.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,18 @@ class IdentityInterface(IOBase):
4646
def __init__(self, fields=None, mandatory_inputs=True, **inputs):
4747
super(IdentityInterface, self).__init__(**inputs)
4848
if fields is None or not fields:
49-
raise Exception('Identity Interface fields must be a non-empty list')
49+
raise ValueError('Identity Interface fields must be a non-empty list')
50+
# Each input must be in the fields.
51+
for in_field in inputs:
52+
if in_field not in fields:
53+
raise ValueError('Identity Interface input is not in the fields: %s' % in_field)
5054
self._fields = fields
5155
self._mandatory_inputs = mandatory_inputs
5256
add_traits(self.inputs, fields)
57+
# Adding any traits wipes out all input values set in superclass initialization,
58+
# even it the trait is not in the add_traits argument. The work-around is to reset
59+
# the values after adding the traits.
60+
self.inputs.set(**inputs)
5361

5462
def _add_output_traits(self, base):
5563
undefined_traits = {}

nipype/pipeline/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
66
"""
77
__docformat__ = 'restructuredtext'
8-
from .engine import Node, MapNode, Workflow
8+
from engine import Node, MapNode, JoinNode, Workflow

0 commit comments

Comments
 (0)