Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit ef57cd8

Browse files
JOIN task is made async (#3284)
1 parent fd2816d commit ef57cd8

File tree

15 files changed

+295
-52
lines changed

15 files changed

+295
-52
lines changed

cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/util/Statements.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101
* <li>UPDATE conductor.workflows SET payload=? WHERE workflow_id=? AND shard_id=1 AND
102102
* entity='workflow' AND task_id='';
103103
* <li>UPDATE conductor.workflows SET total_tasks=? WHERE workflow_id=? AND shard_id=?;
104-
* <li>UPDATE conductor.workflows SET * total_partitions=?,total_tasks=? WHERE workflow_id=? AND
104+
* <li>UPDATE conductor.workflows SET total_partitions=?,total_tasks=? WHERE workflow_id=? AND
105105
* shard_id=1;
106106
* <li>UPDATE conductor.task_lookup SET workflow_id=? WHERE task_id=?;
107107
* <li>UPDATE conductor.task_def_limit SET workflow_id=? WHERE task_def_name=? AND task_id=?;

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ public class WorkflowExecutor {
6666

6767
private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class);
6868
private static final int EXPEDITED_PRIORITY = 10;
69-
69+
private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName();
70+
private static final Predicate<TaskModel> UNSUCCESSFUL_TERMINAL_TASK =
71+
task -> !task.getStatus().isSuccessful() && task.getStatus().isTerminal();
72+
private static final Predicate<TaskModel> UNSUCCESSFUL_JOIN_TASK =
73+
UNSUCCESSFUL_TERMINAL_TASK.and(t -> TaskType.TASK_TYPE_JOIN.equals(t.getTaskType()));
74+
private static final Predicate<TaskModel> NON_TERMINAL_TASK =
75+
task -> !task.getStatus().isTerminal();
7076
private final MetadataDAO metadataDAO;
7177
private final QueueDAO queueDAO;
7278
private final DeciderService deciderService;
@@ -78,20 +84,9 @@ public class WorkflowExecutor {
7884
private final WorkflowStatusListener workflowStatusListener;
7985
private final SystemTaskRegistry systemTaskRegistry;
8086
private final ApplicationEventPublisher eventPublisher;
81-
8287
private long activeWorkerLastPollMs;
83-
private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName();
8488
private final ExecutionLockService executionLockService;
8589

86-
private static final Predicate<TaskModel> UNSUCCESSFUL_TERMINAL_TASK =
87-
task -> !task.getStatus().isSuccessful() && task.getStatus().isTerminal();
88-
89-
private static final Predicate<TaskModel> UNSUCCESSFUL_JOIN_TASK =
90-
UNSUCCESSFUL_TERMINAL_TASK.and(t -> TaskType.TASK_TYPE_JOIN.equals(t.getTaskType()));
91-
92-
private static final Predicate<TaskModel> NON_TERMINAL_TASK =
93-
task -> !task.getStatus().isTerminal();
94-
9590
private final Predicate<PollData> validateLastPolledTime =
9691
pollData ->
9792
pollData.getLastPollTime()
@@ -352,6 +347,7 @@ private void retry(WorkflowModel workflow) {
352347
if (task.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString())
353348
|| task.getTaskType().equalsIgnoreCase(TaskType.DO_WHILE.toString())) {
354349
task.setStatus(IN_PROGRESS);
350+
addTaskToQueue(task);
355351
// Task doesn't have to be updated yet. Will be updated along with other
356352
// Workflow tasks downstream.
357353
} else {
@@ -870,10 +866,7 @@ public void updateTask(TaskResult taskResult) {
870866
task.getTaskDefName(), lastDuration, false, task.getStatus());
871867
}
872868

873-
// sync evaluate workflow only if the task is not within a forked branch
874-
if (isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
875-
expediteLazyWorkflowEvaluation(workflowId);
876-
} else {
869+
if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
877870
decide(workflowId);
878871
}
879872
}
@@ -1094,7 +1087,11 @@ private void adjustStateIfSubWorkflowChanged(WorkflowModel workflow) {
10941087
// and the JOIN task(s) needs to be evaluated again, set them to IN_PROGRESS
10951088
workflow.getTasks().stream()
10961089
.filter(UNSUCCESSFUL_JOIN_TASK)
1097-
.peek(t -> t.setStatus(TaskModel.Status.IN_PROGRESS))
1090+
.peek(
1091+
task -> {
1092+
task.setStatus(TaskModel.Status.IN_PROGRESS);
1093+
addTaskToQueue(task);
1094+
})
10981095
.forEach(executionDAOFacade::updateTask);
10991096
}
11001097
}
@@ -1305,6 +1302,7 @@ public void skipTaskFromWorkflow(
13051302
taskToBeSkipped.setWorkflowInstanceId(workflowId);
13061303
taskToBeSkipped.setWorkflowPriority(workflow.getPriority());
13071304
taskToBeSkipped.setStatus(SKIPPED);
1305+
taskToBeSkipped.setEndTime(System.currentTimeMillis());
13081306
taskToBeSkipped.setTaskType(workflowTask.getName());
13091307
taskToBeSkipped.setCorrelationId(workflow.getCorrelationId());
13101308
if (skipTaskRequest != null) {

core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,8 @@ public boolean execute(
9797
}
9898
return false;
9999
}
100+
101+
public boolean isAsync() {
102+
return true;
103+
}
100104
}

core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,8 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
190190
break;
191191
}
192192
}
193-
} catch (TransientException te) {
193+
} catch (TransientException | TerminateWorkflowException te) {
194194
throw te;
195-
} catch (TerminateWorkflowException twe) {
196-
throw twe;
197195
} catch (Exception e) {
198196
LOGGER.error(
199197
"Unable to upload payload to external storage for workflow: {}", workflowId, e);
@@ -223,7 +221,6 @@ void failTask(TaskModel task, PayloadType payloadType, String errorMsg) {
223221
} else {
224222
task.setOutputData(new HashMap<>());
225223
}
226-
throw new TerminateWorkflowException(errorMsg, WorkflowModel.Status.FAILED, task);
227224
}
228225

229226
@VisibleForTesting

core/src/test/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtilsTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
import com.fasterxml.jackson.databind.ObjectMapper;
4343

44+
import static com.netflix.conductor.model.TaskModel.Status.FAILED_WITH_TERMINAL_ERROR;
45+
4446
import static org.junit.Assert.*;
4547
import static org.mockito.ArgumentMatchers.*;
4648
import static org.mockito.Mockito.*;
@@ -199,23 +201,23 @@ public void testFailTaskWithInputPayload() {
199201
TaskModel task = new TaskModel();
200202
task.setInputData(new HashMap<>());
201203

202-
expectedException.expect(TerminateWorkflowException.class);
203204
externalPayloadStorageUtils.failTask(
204205
task, ExternalPayloadStorage.PayloadType.TASK_INPUT, "error");
205206
assertNotNull(task);
206207
assertTrue(task.getInputData().isEmpty());
208+
assertEquals(FAILED_WITH_TERMINAL_ERROR, task.getStatus());
207209
}
208210

209211
@Test
210212
public void testFailTaskWithOutputPayload() {
211213
TaskModel task = new TaskModel();
212214
task.setOutputData(new HashMap<>());
213215

214-
expectedException.expect(TerminateWorkflowException.class);
215216
externalPayloadStorageUtils.failTask(
216217
task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT, "error");
217218
assertNotNull(task);
218219
assertTrue(task.getOutputData().isEmpty());
220+
assertEquals(FAILED_WITH_TERMINAL_ERROR, task.getStatus());
219221
}
220222

221223
@Test

test-harness/src/test/groovy/com/netflix/conductor/test/integration/DecisionTaskSpec.groovy

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
*/
1313
package com.netflix.conductor.test.integration
1414

15+
import org.springframework.beans.factory.annotation.Autowired
16+
1517
import com.netflix.conductor.common.metadata.tasks.Task
1618
import com.netflix.conductor.common.run.Workflow
19+
import com.netflix.conductor.core.execution.tasks.Join
20+
import com.netflix.conductor.dao.QueueDAO
1721
import com.netflix.conductor.test.base.AbstractSpecification
1822

1923
import spock.lang.Shared
@@ -23,6 +27,9 @@ import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAc
2327

2428
class DecisionTaskSpec extends AbstractSpecification {
2529

30+
@Autowired
31+
Join joinTask
32+
2633
@Shared
2734
def DECISION_WF = "DecisionWorkflow"
2835

@@ -139,6 +146,7 @@ class DecisionTaskSpec extends AbstractSpecification {
139146
}
140147

141148
when: "the tasks 'integration_task_1' and 'integration_task_10' are polled and completed"
149+
def joinTaskId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("joinTask").taskId
142150
def polledAndCompletedTask1Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker')
143151
def polledAndCompletedTask10Try1 = workflowTestUtil.pollAndCompleteTask('integration_task_10', 'task1.integration.worker')
144152

@@ -189,7 +197,10 @@ class DecisionTaskSpec extends AbstractSpecification {
189197
then: "verify that the task is completed and acknowledged"
190198
verifyPolledAndAcknowledgedTask(polledAndCompletedTask20Try1)
191199

192-
and: "verify that the 'integration_task_2' is COMPLETED and the workflow has progressed"
200+
when: "JOIN task is polled and executed"
201+
asyncSystemTaskExecutor.execute(joinTask, joinTaskId)
202+
203+
then: "verify that JOIN is COMPLETED and the workflow has progressed"
193204
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
194205
status == Workflow.WorkflowStatus.COMPLETED
195206
tasks.size() == 7

test-harness/src/test/groovy/com/netflix/conductor/test/integration/DoWhileSpec.groovy

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,30 @@ import com.netflix.conductor.common.metadata.tasks.Task
1818
import com.netflix.conductor.common.metadata.tasks.TaskDef
1919
import com.netflix.conductor.common.run.Workflow
2020
import com.netflix.conductor.common.utils.TaskUtils
21+
import com.netflix.conductor.core.execution.tasks.Join
2122
import com.netflix.conductor.core.execution.tasks.SubWorkflow
2223
import com.netflix.conductor.test.base.AbstractSpecification
2324

2425
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask
2526

2627
class DoWhileSpec extends AbstractSpecification {
2728

29+
@Autowired
30+
Join joinTask
31+
2832
@Autowired
2933
SubWorkflow subWorkflowTask
3034

3135
def setup() {
32-
workflowTestUtil.registerWorkflows("do_while_integration_test.json",
33-
"do_while_multiple_integration_test.json",
34-
"do_while_as_subtask_integration_test.json",
36+
workflowTestUtil.registerWorkflows('do_while_integration_test.json',
37+
'do_while_multiple_integration_test.json',
38+
'do_while_as_subtask_integration_test.json',
3539
'simple_one_task_sub_workflow_integration_test.json',
3640
'do_while_iteration_fix_test.json',
37-
"do_while_sub_workflow_integration_test.json",
38-
"do_while_five_loop_over_integration_test.json",
39-
"do_while_system_tasks.json",
40-
"do_while_set_variable_fix.json")
41+
'do_while_sub_workflow_integration_test.json',
42+
'do_while_five_loop_over_integration_test.json',
43+
'do_while_system_tasks.json',
44+
'do_while_set_variable_fix.json')
4145
}
4246

4347
def "Test workflow with 2 iterations of five tasks"() {
@@ -275,6 +279,7 @@ class DoWhileSpec extends AbstractSpecification {
275279
}
276280

277281
when: "Polling and completing second task"
282+
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
278283
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')
279284

280285
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -300,6 +305,9 @@ class DoWhileSpec extends AbstractSpecification {
300305
when: "Polling and completing third task"
301306
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')
302307

308+
and: "JOIN task is executed"
309+
asyncSystemTaskExecutor.execute(joinTask, joinId)
310+
303311
then: "Verify that the task was polled and acknowledged and workflow is in completed state"
304312
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
305313
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
@@ -363,6 +371,7 @@ class DoWhileSpec extends AbstractSpecification {
363371
}
364372

365373
when: "Polling and completing second task"
374+
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
366375
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')
367376

368377
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -388,6 +397,9 @@ class DoWhileSpec extends AbstractSpecification {
388397
when: "Polling and completing third task"
389398
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')
390399

400+
and: "JOIN task is executed"
401+
asyncSystemTaskExecutor.execute(joinTask, joinId)
402+
391403
then: "Verify that the task was polled and acknowledged and workflow is in completed state"
392404
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
393405
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
@@ -528,6 +540,7 @@ class DoWhileSpec extends AbstractSpecification {
528540
}
529541

530542
when: "Polling and completing second task"
543+
def join1Id = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
531544
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')
532545

533546
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -553,6 +566,9 @@ class DoWhileSpec extends AbstractSpecification {
553566
when: "Polling and completing third task"
554567
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')
555568

569+
and: "JOIN task is executed"
570+
asyncSystemTaskExecutor.execute(joinTask, join1Id)
571+
556572
then: "Verify that the task was polled and acknowledged and workflow is in running state"
557573
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
558574
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
@@ -609,6 +625,7 @@ class DoWhileSpec extends AbstractSpecification {
609625
}
610626

611627
when: "Polling and completing second iteration of second task"
628+
def join2Id = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__2").taskId
612629
Tuple polledAndCompletedSecondIterationTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')
613630

614631
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -644,6 +661,9 @@ class DoWhileSpec extends AbstractSpecification {
644661
when: "Polling and completing second iteration of third task"
645662
Tuple polledAndCompletedSecondIterationTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')
646663

664+
and: "JOIN task is executed"
665+
asyncSystemTaskExecutor.execute(joinTask, join2Id)
666+
647667
then: "Verify that the task was polled and acknowledged and workflow is in running state"
648668
verifyPolledAndAcknowledgedTask(polledAndCompletedSecondIterationTask2)
649669
verifyTaskIteration(polledAndCompletedSecondIterationTask2[0] as Task, 2)
@@ -796,6 +816,7 @@ class DoWhileSpec extends AbstractSpecification {
796816
}
797817

798818
when: "Polling and completing second task"
819+
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
799820
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')
800821

801822
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -823,6 +844,9 @@ class DoWhileSpec extends AbstractSpecification {
823844
when: "Polling and completing third task"
824845
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')
825846

847+
and: "JOIN task is executed"
848+
asyncSystemTaskExecutor.execute(joinTask, joinId)
849+
826850
then: "Verify that the task was polled and acknowledged and workflow is in completed state"
827851
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
828852
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
@@ -920,6 +944,7 @@ class DoWhileSpec extends AbstractSpecification {
920944
}
921945

922946
when: "Polling and completing second task"
947+
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join__1").taskId
923948
Tuple polledAndCompletedTask1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'integration.test.worker')
924949

925950
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -947,6 +972,9 @@ class DoWhileSpec extends AbstractSpecification {
947972
when: "Polling and completing third task"
948973
Tuple polledAndCompletedTask2 = workflowTestUtil.pollAndCompleteTask('integration_task_2', 'integration.test.worker')
949974

975+
and: "JOIN task is executed"
976+
asyncSystemTaskExecutor.execute(joinTask, joinId)
977+
950978
then: "Verify that the task was polled and acknowledged and workflow is in completed state"
951979
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
952980
verifyTaskIteration(polledAndCompletedTask2[0] as Task, 1)
@@ -998,6 +1026,7 @@ class DoWhileSpec extends AbstractSpecification {
9981026
}
9991027

10001028
when: "Polling and completing first task in DO While"
1029+
def joinId = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName("join").taskId
10011030
Tuple polledAndCompletedTask0 = workflowTestUtil.pollAndCompleteTask('integration_task_0', 'integration.test.worker')
10021031

10031032
then: "Verify that the task was polled and acknowledged and workflow is in running state"
@@ -1049,6 +1078,9 @@ class DoWhileSpec extends AbstractSpecification {
10491078
and: "the workflow is evaluated"
10501079
sweep(workflowInstanceId)
10511080

1081+
and: "JOIN task is executed"
1082+
asyncSystemTaskExecutor.execute(joinTask, joinId)
1083+
10521084
then: "Verify that the task was polled and acknowledged and workflow is in completed state"
10531085
verifyPolledAndAcknowledgedTask(polledAndCompletedTask2)
10541086
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {

0 commit comments

Comments
 (0)