Skip to content

Commit 2beaff7

Browse files
Handle conditional in dynamic (#301)
Signed-off-by: Andres Gomez Ferrer <[email protected]> Co-authored-by: Andres Gomez Ferrer <[email protected]>
1 parent 4b98408 commit 2beaff7

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import com.google.common.collect.ImmutableMap;
2626
import com.google.common.collect.Maps;
27+
import java.util.ArrayList;
2728
import java.util.Collection;
2829
import java.util.HashMap;
2930
import java.util.List;
@@ -41,6 +42,7 @@
4142
import org.flyte.api.v1.DynamicJobSpec;
4243
import org.flyte.api.v1.DynamicWorkflowTask;
4344
import org.flyte.api.v1.DynamicWorkflowTaskRegistrar;
45+
import org.flyte.api.v1.IfBlock;
4446
import org.flyte.api.v1.Literal;
4547
import org.flyte.api.v1.NamedEntityIdentifier;
4648
import org.flyte.api.v1.Node;
@@ -280,6 +282,28 @@ private static List<Node> collectAllUsedTaskTemplates(
280282
flyteAdminClient,
281283
cache);
282284

285+
// collect task templates used by conditionals
286+
spec.nodes().stream()
287+
.filter(node -> node.branchNode() != null)
288+
.forEach(
289+
node -> {
290+
List<Node> nodes = new ArrayList<>();
291+
nodes.add(node.branchNode().ifElse().case_().thenNode());
292+
nodes.add(node.branchNode().ifElse().elseNode());
293+
nodes.addAll(
294+
node.branchNode().ifElse().other().stream()
295+
.map(IfBlock::thenNode)
296+
.collect(toList()));
297+
298+
collectTaskTemplates(
299+
nodes,
300+
nodesRewriter,
301+
allUsedTaskTemplates,
302+
allTaskTemplates,
303+
flyteAdminClient,
304+
cache);
305+
});
306+
283307
// collect task templates used by subworkflows
284308
allUsedSubWorkflows
285309
.values()

0 commit comments

Comments
 (0)