Skip to content

Commit fdf5f5e

Browse files
committed
WorkflowTask permissive property added, so various task types
can be permissive.
1 parent 92f06bf commit fdf5f5e

File tree

18 files changed

+52
-293
lines changed

18 files changed

+52
-293
lines changed

common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskType.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public enum TaskType {
2323
DYNAMIC,
2424
FORK_JOIN,
2525
FORK_JOIN_DYNAMIC,
26-
PERMISSIVE,
2726
DECISION,
2827
SWITCH,
2928
JOIN,
@@ -71,7 +70,6 @@ public enum TaskType {
7170
public static final String TASK_TYPE_JSON_JQ_TRANSFORM = "JSON_JQ_TRANSFORM";
7271
public static final String TASK_TYPE_SET_VARIABLE = "SET_VARIABLE";
7372
public static final String TASK_TYPE_FORK = "FORK";
74-
public static final String TASK_TYPE_PERMISSIVE = "PERMISSIVE";
7573
public static final String TASK_TYPE_NOOP = "NOOP";
7674

7775
private static final Set<String> BUILT_IN_TASKS = new HashSet<>();

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ public void setTasks(List<WorkflowTask> tasks) {
153153
@ProtoField(id = 28)
154154
private String expression;
155155

156+
@ProtoField(id = 29)
157+
private boolean permissive = false;
158+
156159
/**
157160
* @return the name
158161
*/
@@ -548,6 +551,22 @@ public void setExpression(String expression) {
548551
this.expression = expression;
549552
}
550553

554+
/**
555+
* @return If the task is permissive. When set to true, and the task is in failed status,
556+
* fail-fast does not occur. The workflow execution continues until reaching join or end of
557+
* workflow, allowing idempotent execution of other tasks.
558+
*/
559+
public boolean isPermissive() {
560+
return this.permissive;
561+
}
562+
563+
/**
564+
* @param permissive when set to true, the task is marked as permissive
565+
*/
566+
public void setPermissive(boolean permissive) {
567+
this.permissive = permissive;
568+
}
569+
551570
private Collection<List<WorkflowTask>> children() {
552571
Collection<List<WorkflowTask>> workflowTaskLists = new LinkedList<>();
553572

core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import com.netflix.conductor.model.TaskModel;
4545
import com.netflix.conductor.model.WorkflowModel;
4646

47-
import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
4847
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;
4948
import static com.netflix.conductor.common.metadata.tasks.TaskType.USER_DEFINED;
5049
import static com.netflix.conductor.model.TaskModel.Status.*;
@@ -209,9 +208,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
209208
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
210209
outcome.tasksToBeUpdated.add(pendingTask);
211210
} else if (!(pendingTask.getWorkflowTask() != null
212-
&& TaskType.PERMISSIVE
213-
.name()
214-
.equals(pendingTask.getWorkflowTask().getType())
211+
&& pendingTask.getWorkflowTask().isPermissive()
215212
&& !pendingTask.getWorkflowTask().isOptional())) {
216213
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
217214
}
@@ -262,7 +259,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
262259
List<TaskModel> permissiveTasksTerminalNonSuccessful =
263260
workflow.getTasks().stream()
264261
.filter(t -> t.getWorkflowTask() != null)
265-
.filter(t -> PERMISSIVE.name().equals(t.getWorkflowTask().getType()))
262+
.filter(t -> t.getWorkflowTask().isPermissive())
266263
.filter(t -> !t.getWorkflowTask().isOptional())
267264
.collect(
268265
Collectors.toMap(
@@ -563,8 +560,7 @@ Optional<TaskModel> retry(
563560
|| TaskType.isBuiltIn(task.getTaskType())
564561
|| expectedRetryCount <= retryCount) {
565562
if (workflowTask != null
566-
&& (workflowTask.isOptional()
567-
|| TaskType.PERMISSIVE.name().equals(workflowTask.getType()))) {
563+
&& (workflowTask.isOptional() || workflowTask.isPermissive())) {
568564
return Optional.empty();
569565
}
570566
WorkflowModel.Status status;

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1832,7 +1832,7 @@ private static boolean isJoinOnFailedPermissive(List<String> joinOn, WorkflowMod
18321832
.map(workflow::getTaskByRefName)
18331833
.anyMatch(
18341834
t ->
1835-
TaskType.PERMISSIVE.name().equals(t.getWorkflowTask().getType())
1835+
t.getWorkflowTask().isPermissive()
18361836
&& !t.getWorkflowTask().isOptional()
18371837
&& t.getStatus().equals(FAILED));
18381838
}

core/src/main/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapper.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.netflix.conductor.model.TaskModel;
2525
import com.netflix.conductor.model.WorkflowModel;
2626

27-
import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
2827
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_EXCLUSIVE_JOIN;
2928

3029
@Component(TASK_TYPE_EXCLUSIVE_JOIN)
@@ -68,7 +67,7 @@ public boolean execute(
6867
foundExlusiveJoinOnTask = taskStatus.isTerminal();
6968
hasFailures =
7069
!taskStatus.isSuccessful()
71-
&& (!PERMISSIVE.name().equals(exclusiveTask.getWorkflowTask().getType())
70+
&& (!exclusiveTask.getWorkflowTask().isPermissive()
7271
|| joinOn.stream()
7372
.map(workflow::getTaskByRefName)
7473
.allMatch(t -> t.getStatus().isTerminal()));

core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.netflix.conductor.model.TaskModel;
2424
import com.netflix.conductor.model.WorkflowModel;
2525

26-
import static com.netflix.conductor.common.metadata.tasks.TaskType.PERMISSIVE;
2726
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOIN;
2827

2928
@Component(TASK_TYPE_JOIN)
@@ -61,7 +60,7 @@ public boolean execute(
6160
hasFailures =
6261
!taskStatus.isSuccessful()
6362
&& !forkedTask.getWorkflowTask().isOptional()
64-
&& (!PERMISSIVE.name().equals(forkedTask.getWorkflowTask().getType())
63+
&& (!forkedTask.getWorkflowTask().isPermissive()
6564
|| joinOn.stream()
6665
.map(workflow::getTaskByRefName)
6766
.allMatch(t -> t.getStatus().isTerminal()));

core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,22 +440,21 @@ public void testOptional() {
440440
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
441441
}
442442

443-
/** Similar to {@link #testOptional} */
444443
@Test
445444
public void testPermissive() {
446445
WorkflowDef def = new WorkflowDef();
447446
def.setName("test-permissive");
448447

449448
WorkflowTask task1 = new WorkflowTask();
450449
task1.setName("task0");
451-
task1.setType("PERMISSIVE");
450+
task1.setPermissive(true);
452451
task1.setTaskReferenceName("t0");
453452
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
454453
task1.setTaskDefinition(new TaskDef("task0"));
455454

456455
WorkflowTask task2 = new WorkflowTask();
457456
task2.setName("task1");
458-
task2.setType("PERMISSIVE");
457+
task2.setPermissive(true);
459458
task2.setTaskReferenceName("t1");
460459
task2.setTaskDefinition(new TaskDef("task1"));
461460

core/src/test/java/com/netflix/conductor/core/execution/mapper/PermissiveTaskMapperTest.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,6 +1351,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
13511351
if (from.getExpression() != null) {
13521352
to.setExpression( from.getExpression() );
13531353
}
1354+
to.setPermissive( from.isPermissive() );
13541355
return to.build();
13551356
}
13561357

@@ -1396,6 +1397,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
13961397
to.setRetryCount( from.getRetryCount() );
13971398
to.setEvaluatorType( from.getEvaluatorType() );
13981399
to.setExpression( from.getExpression() );
1400+
to.setPermissive( from.getPermissive() );
13991401
return to;
14001402
}
14011403

0 commit comments

Comments
 (0)