Skip to content

Commit e60b0e6

Browse files
authored
Merge pull request #3 from ivakoleva/permissive-task-feature
Permissive task capability
2 parents 88f6e18 + fdf5f5e commit e60b0e6

File tree

14 files changed

+1070
-14
lines changed

14 files changed

+1070
-14
lines changed

common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ public void setTasks(List<WorkflowTask> tasks) {
152152
@ProtoField(id = 28)
153153
private String expression;
154154

155+
@ProtoField(id = 29)
156+
private boolean permissive = false;
157+
155158
/**
156159
* @return the name
157160
*/
@@ -547,6 +550,22 @@ public void setExpression(String expression) {
547550
this.expression = expression;
548551
}
549552

553+
/**
554+
* @return If the task is permissive. When set to true, and the task is in failed status,
555+
* fail-fast does not occur. The workflow execution continues until reaching join or end of
556+
* workflow, allowing idempotent execution of other tasks.
557+
*/
558+
public boolean isPermissive() {
559+
return this.permissive;
560+
}
561+
562+
/**
563+
* @param permissive when set to true, the task is marked as permissive
564+
*/
565+
public void setPermissive(boolean permissive) {
566+
this.permissive = permissive;
567+
}
568+
550569
private Collection<List<WorkflowTask>> children() {
551570
Collection<List<WorkflowTask>> workflowTaskLists = new LinkedList<>();
552571

core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
207207
tasksToBeScheduled.put(retryTask.get().getReferenceTaskName(), retryTask.get());
208208
executedTaskRefNames.remove(retryTask.get().getReferenceTaskName());
209209
outcome.tasksToBeUpdated.add(pendingTask);
210-
} else {
210+
} else if (!(pendingTask.getWorkflowTask() != null
211+
&& pendingTask.getWorkflowTask().isPermissive()
212+
&& !pendingTask.getWorkflowTask().isOptional())) {
211213
pendingTask.setStatus(COMPLETED_WITH_ERRORS);
212214
}
213215
}
@@ -254,6 +256,39 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
254256
if (hasSuccessfulTerminateTask
255257
|| (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) {
256258
LOGGER.debug("Marking workflow: {} as complete.", workflow);
259+
List<TaskModel> permissiveTasksTerminalNonSuccessful =
260+
workflow.getTasks().stream()
261+
.filter(t -> t.getWorkflowTask() != null)
262+
.filter(t -> t.getWorkflowTask().isPermissive())
263+
.filter(t -> !t.getWorkflowTask().isOptional())
264+
.collect(
265+
Collectors.toMap(
266+
TaskModel::getReferenceTaskName,
267+
t -> t,
268+
(t1, t2) ->
269+
t1.getRetryCount() > t2.getRetryCount()
270+
? t1
271+
: t2))
272+
.values()
273+
.stream()
274+
.filter(
275+
t ->
276+
t.getStatus().isTerminal()
277+
&& !t.getStatus().isSuccessful())
278+
.toList();
279+
if (!permissiveTasksTerminalNonSuccessful.isEmpty()) {
280+
final String errMsg =
281+
permissiveTasksTerminalNonSuccessful.stream()
282+
.map(
283+
t ->
284+
String.format(
285+
"Task %s failed with status: %s and reason: '%s'",
286+
t.getTaskId(),
287+
t.getStatus(),
288+
t.getReasonForIncompletion()))
289+
.collect(Collectors.joining(". "));
290+
throw new TerminateWorkflowException(errMsg);
291+
}
257292
outcome.isComplete = true;
258293
}
259294

@@ -437,11 +472,6 @@ public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
437472
if (status == null || !status.isTerminal()) {
438473
return false;
439474
}
440-
// if we reach here, the task has been completed.
441-
// Was the task successful in completion?
442-
if (!status.isSuccessful()) {
443-
return false;
444-
}
445475
}
446476

447477
boolean noPendingSchedule =
@@ -529,7 +559,8 @@ Optional<TaskModel> retry(
529559
if (!task.getStatus().isRetriable()
530560
|| TaskType.isBuiltIn(task.getTaskType())
531561
|| expectedRetryCount <= retryCount) {
532-
if (workflowTask != null && workflowTask.isOptional()) {
562+
if (workflowTask != null
563+
&& (workflowTask.isOptional() || workflowTask.isPermissive())) {
533564
return Optional.empty();
534565
}
535566
WorkflowModel.Status status;

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,18 @@ private void retry(WorkflowModel workflow) {
336336
for (TaskModel task : workflow.getTasks()) {
337337
switch (task.getStatus()) {
338338
case FAILED:
339+
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
340+
|| task.getTaskType()
341+
.equalsIgnoreCase(TaskType.EXCLUSIVE_JOIN.toString())) {
342+
@SuppressWarnings("unchecked")
343+
List<String> joinOn = (List<String>) task.getInputData().get("joinOn");
344+
boolean joinOnFailedPermissive = isJoinOnFailedPermissive(joinOn, workflow);
345+
if (joinOnFailedPermissive) {
346+
task.setStatus(IN_PROGRESS);
347+
addTaskToQueue(task);
348+
break;
349+
}
350+
}
339351
case FAILED_WITH_TERMINAL_ERROR:
340352
case TIMED_OUT:
341353
retriableMap.put(task.getReferenceTaskName(), task);
@@ -1814,4 +1826,14 @@ private void expediteLazyWorkflowEvaluation(String workflowId) {
18141826

18151827
LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE);
18161828
}
1829+
1830+
private static boolean isJoinOnFailedPermissive(List<String> joinOn, WorkflowModel workflow) {
1831+
return joinOn.stream()
1832+
.map(workflow::getTaskByRefName)
1833+
.anyMatch(
1834+
t ->
1835+
t.getWorkflowTask().isPermissive()
1836+
&& !t.getWorkflowTask().isOptional()
1837+
&& t.getStatus().equals(FAILED));
1838+
}
18171839
}

core/src/main/java/com/netflix/conductor/core/execution/tasks/ExclusiveJoin.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,20 @@ public boolean execute(
6565
}
6666
taskStatus = exclusiveTask.getStatus();
6767
foundExlusiveJoinOnTask = taskStatus.isTerminal();
68-
hasFailures = !taskStatus.isSuccessful();
68+
hasFailures =
69+
!taskStatus.isSuccessful()
70+
&& (!exclusiveTask.getWorkflowTask().isPermissive()
71+
|| joinOn.stream()
72+
.map(workflow::getTaskByRefName)
73+
.allMatch(t -> t.getStatus().isTerminal()));
6974
if (hasFailures) {
70-
failureReason.append(exclusiveTask.getReasonForIncompletion()).append(" ");
75+
final String failureReasons =
76+
joinOn.stream()
77+
.map(workflow::getTaskByRefName)
78+
.filter(t -> !t.getStatus().isSuccessful())
79+
.map(TaskModel::getReasonForIncompletion)
80+
.collect(Collectors.joining(" "));
81+
failureReason.append(failureReasons);
7182
}
7283

7384
break;

core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,21 @@ public boolean execute(
5757
break;
5858
}
5959
TaskModel.Status taskStatus = forkedTask.getStatus();
60-
hasFailures = !taskStatus.isSuccessful() && !forkedTask.getWorkflowTask().isOptional();
60+
hasFailures =
61+
!taskStatus.isSuccessful()
62+
&& !forkedTask.getWorkflowTask().isOptional()
63+
&& (!forkedTask.getWorkflowTask().isPermissive()
64+
|| joinOn.stream()
65+
.map(workflow::getTaskByRefName)
66+
.allMatch(t -> t.getStatus().isTerminal()));
6167
if (hasFailures) {
62-
failureReason.append(forkedTask.getReasonForIncompletion()).append(" ");
68+
final String failureReasons =
69+
joinOn.stream()
70+
.map(workflow::getTaskByRefName)
71+
.filter(t -> !t.getStatus().isSuccessful())
72+
.map(TaskModel::getReasonForIncompletion)
73+
.collect(Collectors.joining(" "));
74+
failureReason.append(failureReasons);
6375
}
6476
// Only add to task output if it's not empty
6577
if (!forkedTask.getOutputData().isEmpty()) {

core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,79 @@ public void testOptional() {
440440
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
441441
}
442442

443+
@Test
444+
public void testPermissive() {
445+
WorkflowDef def = new WorkflowDef();
446+
def.setName("test-permissive");
447+
448+
WorkflowTask task1 = new WorkflowTask();
449+
task1.setName("task0");
450+
task1.setPermissive(true);
451+
task1.setTaskReferenceName("t0");
452+
task1.getInputParameters().put("taskId", "${CPEWF_TASK_ID}");
453+
task1.setTaskDefinition(new TaskDef("task0"));
454+
455+
WorkflowTask task2 = new WorkflowTask();
456+
task2.setName("task1");
457+
task2.setPermissive(true);
458+
task2.setTaskReferenceName("t1");
459+
task2.setTaskDefinition(new TaskDef("task1"));
460+
461+
def.getTasks().add(task1);
462+
def.getTasks().add(task2);
463+
def.setSchemaVersion(2);
464+
465+
WorkflowModel workflow = new WorkflowModel();
466+
workflow.setWorkflowDefinition(def);
467+
workflow.setCreateTime(System.currentTimeMillis());
468+
DeciderOutcome outcome = deciderService.decide(workflow);
469+
assertNotNull(outcome);
470+
assertEquals(1, outcome.tasksToBeScheduled.size());
471+
assertEquals(
472+
task1.getTaskReferenceName(),
473+
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
474+
475+
for (int i = 0; i < 3; i++) {
476+
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
477+
assertEquals(task1Id, outcome.tasksToBeScheduled.get(0).getInputData().get("taskId"));
478+
479+
workflow.getTasks().clear();
480+
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
481+
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);
482+
483+
outcome = deciderService.decide(workflow);
484+
485+
assertNotNull(outcome);
486+
assertEquals(1, outcome.tasksToBeUpdated.size());
487+
assertEquals(1, outcome.tasksToBeScheduled.size());
488+
489+
assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
490+
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
491+
assertEquals(
492+
task1.getTaskReferenceName(),
493+
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
494+
assertEquals(i + 1, outcome.tasksToBeScheduled.get(0).getRetryCount());
495+
}
496+
497+
String task1Id = outcome.tasksToBeScheduled.get(0).getTaskId();
498+
499+
workflow.getTasks().clear();
500+
workflow.getTasks().addAll(outcome.tasksToBeScheduled);
501+
workflow.getTasks().get(0).setStatus(TaskModel.Status.FAILED);
502+
503+
outcome = deciderService.decide(workflow);
504+
505+
assertNotNull(outcome);
506+
assertEquals(1, outcome.tasksToBeUpdated.size());
507+
assertEquals(1, outcome.tasksToBeScheduled.size());
508+
509+
assertEquals(TaskModel.Status.FAILED, workflow.getTasks().get(0).getStatus());
510+
assertEquals(task1Id, outcome.tasksToBeUpdated.get(0).getTaskId());
511+
assertEquals(
512+
task2.getTaskReferenceName(),
513+
outcome.tasksToBeScheduled.get(0).getReferenceTaskName());
514+
}
515+
443516
@Test
444517
public void testOptionalWithDynamicFork() {
445518
WorkflowDef def = new WorkflowDef();

grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,6 +1351,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
13511351
if (from.getExpression() != null) {
13521352
to.setExpression( from.getExpression() );
13531353
}
1354+
to.setPermissive( from.isPermissive() );
13541355
return to.build();
13551356
}
13561357

@@ -1396,6 +1397,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
13961397
to.setRetryCount( from.getRetryCount() );
13971398
to.setEvaluatorType( from.getEvaluatorType() );
13981399
to.setExpression( from.getExpression() );
1400+
to.setPermissive( from.getPermissive() );
13991401
return to;
14001402
}
14011403

grpc/src/main/proto/model/workflowtask.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,5 @@ message WorkflowTask {
4141
int32 retry_count = 26;
4242
string evaluator_type = 27;
4343
string expression = 28;
44+
bool permissive = 29;
4445
}

0 commit comments

Comments
 (0)