Skip to content

Commit 92f06bf

Browse files
committed
Permissive task capability
Why: We need idempotent forked tasks, meaning all tasks get executed, but any failures are still detected upon join. Feature request Netflix/conductor#3861 What: Introduced the concept of Permissive tasks. A Permissive task is similar to a Simple task. The difference is, it permits the other tasks to continue - in case a Permissive task failed. Result is: 1. Forked Permissive tasks will let each other be evaluated, until all the forked tasks had terminated. Only then, the join task should fail. In case of Permissive optional tasks, the join will not fail. 2. Permissive sequential tasks will let subsequent tasks continue. While at the end, the workflow will fail in case a permissive task had failed. The workflow would not fail in case of Permissive optional task failure. Testing done: PermissiveTaskMapperTest added, TestDeciderOutcomes.testPermissive() added, WorkflowAndTaskConfigurationSpec "Test simple workflow which has a permissive task" and "Test simple workflow which has a permissive optional task added" that cover retry, ForkJoinSpec "Test a simple workflow with fork join permissive failure flow" added and "Test retrying a failed permissive fork join workflow" added. In addition, performed e2e tests locally running a Conductor instance. Did build a docker image with the code changes made, started it locally, and started a SampleWorker to poll 3 tasks in parallel. Verified e2e scenarios of task_def_permissive, task_def_permissive_optional, task_def_simple.json, task_def_simple_optional.json, each joining on 6 forked tasks, then running simple task 7 after join.
1 parent 3c15237 commit 92f06bf

File tree

16 files changed

+1311
-14
lines changed

16 files changed

+1311
-14
lines changed

common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public enum TaskType {
2323
DYNAMIC,
2424
FORK_JOIN,
2525
FORK_JOIN_DYNAMIC,
26+
PERMISSIVE,
2627
DECISION,
2728
SWITCH,
2829
JOIN,
@@ -70,6 +71,7 @@ public enum TaskType {
7071
public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM";
7172
public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE";
7273
public static final String TASK_TYPE_FORK = "FORK";
74+
public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE";
7375
public static final String TASK_TYPE_NOOP = "NOOP";
7476

7577
private static final Set<String> BUILT_IN_TASKS = new HashSet<>();

core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.netflix.conductor.model.TaskModel;
4545
import com.netflix.conductor.model.WorkflowModel;
4646

47+
import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
4748
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
4849
import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED;
4950
import static com.netflix.conductor.model.TaskModel.Status.*;
@@ -207,7 +208,11 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
207208
tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get());
208209
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
209210
outcome.tasksToBeUpdated.add(pendingTask);
210-
} else {
211+
} else if (!(pendingTask.getWorkflowTask() != null
212+
&& TaskType.PERMISSIVE
213+
.name()
214+
.equals(pendingTask.getWorkflowTask().getType())
215+
&& !pendingTask.getWorkflowTask().isOptional())) {
211216
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
212217
}
213218
}
@@ -254,6 +259,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
254259
if (hasSuccessfulTerminateTask
255260
|| (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) {
256261
LOGGER.debug("Marking workflow: {} as complete.", workflow);
262+
List<TaskModel> permissiveTasksTerminalNonSuccessful =
263+
workflow.getTasks().stream()
264+
.filter(t -> t.getWorkflowTask() != null)
265+
.filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType()))
266+
.filter(t -> !t.getWorkflowTask().isOptional())
267+
.collect(
268+
Collectors.toMap(
269+
TaskModel::getReferenceTaskName,
270+
t -> t,
271+
(t1, t2) ->
272+
t1.getRetryCount() > t2.getRetryCount()
273+
? t1
274+
: t2))
275+
.values()
276+
.stream()
277+
.filter(
278+
t ->
279+
t.getStatus().isTerminal()
280+
&& !t.getStatus().isSuccessful())
281+
.toList();
282+
if (!permissiveTasksTerminalNonSuccessful.isEmpty()) {
283+
final String errMsg =
284+
permissiveTasksTerminalNonSuccessful.stream()
285+
.map(
286+
t ->
287+
String.format(
288+
"Task %s failed with status: %s and reason: '%s'",
289+
t.getTaskId(),
290+
t.getStatus(),
291+
t.getReasonForIncompletion()))
292+
.collect(Collectors.joining(". "));
293+
throw new TerminateWorkflowException(errMsg);
294+
}
257295
outcome.isComplete = true;
258296
}
259297

@@ -437,11 +475,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
437475
if (status == null || !status.isTerminal()) {
438476
return false;
439477
}
440-
// if we reach here, the task has been completed.
441-
// Was the task successful in completion?
442-
if (!status.isSuccessful()) {
443-
return false;
444-
}
445478
}
446479

447480
boolean noPendingSchedule =
@@ -529,7 +562,9 @@ Optional<TaskModel> retry(
529562
if (!task.getStatus().isRetriable()
530563
|| TaskType.isBuiltIn(task.getTaskType())
531564
|| expectedRetryCount <= retryCount) {
532-
if (workflowTask != null && workflowTask.isOptional()) {
565+
if (workflowTask != null
566+
&& (workflowTask.isOptional()
567+
|| TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) {
533568
return Optional.empty();
534569
}
535570
WorkflowModel.Status status;

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) {
336336
for (TaskModel task : workflow.getTasks()) {
337337
switch (task.getStatus()) {
338338
case FAILED:
339+
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
340+
|| task.getTaskType()
341+
.equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) {
342+
@SuppressWarnings("unchecked")
343+
List<String> joinOn = (List<String>) task.getInputData().get("joinOn");
344+
boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow);
345+
if (joinOnFailedPermissive) {
346+
task.setStatus(IN_PROGRESS);
347+
addTaskToQueue(task);
348+
break;
349+
}
350+
}
339351
case FAILED_WITH_TERMINAL_ERROR:
340352
case TIMED_OUT:
341353
retriableMap.put(task.getReferenceTaskName(), task);
@@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) {
18141826

18151827
LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE);
18161828
}
1829+
1830+
private static boolean isJoinOnFailedPermissive(List<String> joinOn, WorkflowModel workflow) {
1831+
return joinOn.stream()
1832+
.map(workflow::getTaskByRefName)
1833+
.anyMatch(
1834+
t ->
1835+
TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType())
1836+
&& !t.getWorkflowTask().isOptional()
1837+
&& t.getStatus().equals(FAILED));
1838+
}
18171839
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2023 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.core.execution.mapper;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.Optional;
18+
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
import org.springframework.stereotype.Component;
22+
23+
import com.netflix.conductor.common.metadata.tasks.TaskDef;
24+
import com.netflix.conductor.common.metadata.tasks.TaskType;
25+
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
26+
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
27+
import com.netflix.conductor.core.exception.TerminateWorkflowException;
28+
import com.netflix.conductor.core.utils.ParametersUtils;
29+
import com.netflix.conductor.model.TaskModel;
30+
import com.netflix.conductor.model.WorkflowModel;
31+
32+
/**
33+
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
34+
* TaskType#PERMISSIVE} to a {@link TaskModel} with status {@link TaskModel.Status#SCHEDULED}.
35+
*/
36+
@Component
37+
public class PermissiveTaskMapper implements TaskMapper {
38+
39+
public static final Logger LOGGER = LoggerFactory.getLogger(PermissiveTaskMapper.class);
40+
private final ParametersUtils parametersUtils;
41+
42+
public PermissiveTaskMapper(ParametersUtils parametersUtils) {
43+
this.parametersUtils = parametersUtils;
44+
}
45+
46+
@Override
47+
public String getTaskType() {
48+
return TaskType.PERMISSIVE.name();
49+
}
50+
51+
/**
52+
* This method maps a {@link WorkflowTask} of type {@link TaskType#PERMISSIVE} to a {@link
53+
* TaskModel}
54+
*
55+
* @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link
56+
* WorkflowDef}, {@link WorkflowModel} and a string representation of the TaskId
57+
* @return a List with just one exclusive task
58+
* @throws TerminateWorkflowException In case if the task definition does not exist
59+
*/
60+
@Override
61+
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
62+
throws TerminateWorkflowException {
63+
64+
LOGGER.debug("TaskMapperContext {} in PermissiveTaskMapper", taskMapperContext);
65+
66+
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();
67+
WorkflowModel workflowModel = taskMapperContext.getWorkflowModel();
68+
int retryCount = taskMapperContext.getRetryCount();
69+
String retriedTaskId = taskMapperContext.getRetryTaskId();
70+
71+
TaskDef taskDefinition =
72+
Optional.ofNullable(workflowTask.getTaskDefinition())
73+
.orElseThrow(
74+
() -> {
75+
String reason =
76+
String.format(
77+
"Invalid task. Task %s does not have a definition",
78+
workflowTask.getName());
79+
return new TerminateWorkflowException(reason);
80+
});
81+
82+
Map<String, Object> input =
83+
parametersUtils.getTaskInput(
84+
workflowTask.getInputParameters(),
85+
workflowModel,
86+
taskDefinition,
87+
taskMapperContext.getTaskId());
88+
TaskModel permissiveTask = taskMapperContext.createTaskModel();
89+
permissiveTask.setTaskType(workflowTask.getName());
90+
permissiveTask.setStartDelayInSeconds(workflowTask.getStartDelay());
91+
permissiveTask.setInputData(input);
92+
permissiveTask.setStatus(TaskModel.Status.SCHEDULED);
93+
permissiveTask.setRetryCount(retryCount);
94+
permissiveTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
95+
permissiveTask.setResponseTimeoutSeconds(taskDefinition.getResponseTimeoutSeconds());
96+
permissiveTask.setRetriedTaskId(retriedTaskId);
97+
permissiveTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
98+
permissiveTask.setRateLimitFrequencyInSeconds(
99+
taskDefinition.getRateLimitFrequencyInSeconds());
100+
return List.of(permissiveTask);
101+
}
102+
}

core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.netflix.conductor.model.TaskModel;
2525
import com.netflix.conductor.model.WorkflowModel;
2626

27+
import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
2728
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN;
2829

2930
@Component(TASK_TYPE_EXCLUSIVE_JOIN)
@@ -65,9 +66,20 @@ public boolean execute(
6566
}
6667
taskStatus = exclusiveTask.getStatus();
6768
foundExlusiveJoinOnTask = taskStatus.isTerminal();
68-
hasFailures = !taskStatus.isSuccessful();
69+
hasFailures =
70+
!taskStatus.isSuccessful()
71+
&& (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType())
72+
|| joinOn.stream()
73+
.map(workflow::getTaskByRefName)
74+
.allMatch(t -> t.getStatus().isTerminal()));
6975
if (hasFailures) {
70-
failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" ");
76+
final String failureReasons =
77+
joinOn.stream()
78+
.map(workflow::getTaskByRefName)
79+
.filter(t -> !t.getStatus().isSuccessful())
80+
.map(TaskModel::getReasonForIncompletion)
81+
.collect(Collectors.joining(" "));
82+
failureReason.append(failureReasons);
7183
}
7284

7385
break;

core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.netflix.conductor.model.TaskModel;
2424
import com.netflix.conductor.model.WorkflowModel;
2525

26+
import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
2627
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;
2728

2829
@Component(TASK_TYPE_JOIN)
@@ -57,9 +58,21 @@ public boolean execute(
5758
break;
5859
}
5960
TaskModel.Status taskStatus = forkedTask.getStatus();
60-
hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional();
61+
hasFailures =
62+
!taskStatus.isSuccessful()
63+
&& !forkedTask.getWorkflowTask().isOptional()
64+
&& (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType())
65+
|| joinOn.stream()
66+
.map(workflow::getTaskByRefName)
67+
.allMatch(t -> t.getStatus().isTerminal()));
6168
if (hasFailures) {
62-
failureReason.append(forkedTask.getReasonForIncompletion()).append(" ");
69+
final String failureReasons =
70+
joinOn.stream()
71+
.map(workflow::getTaskByRefName)
72+
.filter(t -> !t.getStatus().isSuccessful())
73+
.map(TaskModel::getReasonForIncompletion)
74+
.collect(Collectors.joining(" "));
75+
failureReason.append(failureReasons);
6376
}
6477
// Only add to task output if it's not empty
6578
if (!forkedTask.getOutputData().isEmpty()) {

core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,80 @@ public void testOptional() {
440440
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
441441
}
442442

443+
/** Similar to {@link #testOptional} */
444+
@Test
445+
public void testPermissive() {
446+
WorkflowDef def = new WorkflowDef();
447+
def.setName("test-permissive");
448+
449+
WorkflowTask task1 = new WorkflowTask();
450+
task1.setName("task0");
451+
task1.setType("PERMISSIVE");
452+
task1.setTaskReferenceName("t0");
453+
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
454+
task1.setTaskDefinition(new TaskDef("task0"));
455+
456+
WorkflowTask task2 = new WorkflowTask();
457+
task2.setName("task1");
458+
task2.setType("PERMISSIVE");
459+
task2.setTaskReferenceName("t1");
460+
task2.setTaskDefinition(new TaskDef("task1"));
461+
462+
def.getTasks().add(task1);
463+
def.getTasks().add(task2);
464+
def.setSchemaVersion(2);
465+
466+
WorkflowModel workflow = new WorkflowModel();
467+
workflow.setWorkflowDefinition(def);
468+
workflow.setCreateTime(System.currentTimeMillis());
469+
DeciderOutcome outcome = deciderService.decide(workflow);
470+
assertNotNull(outcome);
471+
assertEquals(1, outcome.tasksToBeScheduled.size());
472+
assertEquals(
473+
task1.getTaskReferenceName(),
474+
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
475+
476+
for (int i = 0; i < 3; i++) {
477+
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
478+
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));
479+
480+
workflow.getTasks().clear();
481+
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
482+
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);
483+
484+
outcome = deciderService.decide(workflow);
485+
486+
assertNotNull(outcome);
487+
assertEquals(1, outcome.tasksToBeUpdated.size());
488+
assertEquals(1, outcome.tasksToBeScheduled.size());
489+
490+
assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
491+
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
492+
assertEquals(
493+
task1.getTaskReferenceName(),
494+
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
495+
assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount());
496+
}
497+
498+
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
499+
500+
workflow.getTasks().clear();
501+
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
502+
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);
503+
504+
outcome = deciderService.decide(workflow);
505+
506+
assertNotNull(outcome);
507+
assertEquals(1, outcome.tasksToBeUpdated.size());
508+
assertEquals(1, outcome.tasksToBeScheduled.size());
509+
510+
assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
511+
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
512+
assertEquals(
513+
task2.getTaskReferenceName(),
514+
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
515+
}
516+
443517
@Test
444518
public void testOptionalWithDynamicFork() {
445519
WorkflowDef def = new WorkflowDef();

0 commit comments

Comments
 (0)