Skip to content

Commit 508c9fb

Browse files
committed
Add the task update API by reference name
1 parent 2978701 commit 508c9fb

File tree

5 files changed

+69
-7
lines changed

5 files changed

+69
-7
lines changed

core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,11 +439,15 @@ public List<TaskModel> createTasks(List<TaskModel> tasks) {
439439
}
440440

441441
public List<Task> getTasksForWorkflow(String workflowId) {
442-
return executionDAO.getTasksForWorkflow(workflowId).stream()
442+
return getTaskModelsForWorkflow(workflowId).stream()
443443
.map(TaskModel::toTask)
444444
.collect(Collectors.toList());
445445
}
446446

447+
public List<TaskModel> getTaskModelsForWorkflow(String workflowId) {
448+
return executionDAO.getTasksForWorkflow(workflowId);
449+
}
450+
447451
public TaskModel getTaskModel(String taskId) {
448452
TaskModel taskModel = getTaskFromDatastore(taskId);
449453
if (taskModel != null) {

core/src/main/java/com/netflix/conductor/service/ExecutionService.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.util.*;
1616
import java.util.stream.Collectors;
17+
import java.util.stream.Stream;
1718

1819
import org.apache.commons.lang3.StringUtils;
1920
import org.slf4j.Logger;
@@ -27,6 +28,7 @@
2728
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
2829
import com.netflix.conductor.common.utils.ExternalPayloadStorage.Operation;
2930
import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType;
31+
import com.netflix.conductor.common.utils.TaskUtils;
3032
import com.netflix.conductor.core.config.ConductorProperties;
3133
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
3234
import com.netflix.conductor.core.events.queue.Message;
@@ -252,12 +254,28 @@ public Task getTask(String taskId) {
252254
}
253255

254256
public Task getPendingTaskForWorkflow(String taskReferenceName, String workflowId) {
255-
return executionDAOFacade.getTasksForWorkflow(workflowId).stream()
256-
.filter(task -> !task.getStatus().isTerminal())
257-
.filter(task -> task.getReferenceTaskName().equals(taskReferenceName))
258-
.findFirst() // There can only be one task by a given reference name running at a
259-
// time.
260-
.orElse(null);
257+
List<TaskModel> tasks = executionDAOFacade.getTaskModelsForWorkflow(workflowId);
258+
Stream<TaskModel> taskStream =
259+
tasks.stream().filter(task -> !task.getStatus().isTerminal());
260+
Optional<TaskModel> found =
261+
taskStream
262+
.filter(task -> task.getReferenceTaskName().equals(taskReferenceName))
263+
.findFirst();
264+
if (found.isPresent()) {
265+
return found.get().toTask();
266+
}
267+
// If no task is found, let's check if there is one inside an iteration
268+
found =
269+
tasks.stream()
270+
.filter(task -> !task.getStatus().isTerminal())
271+
.filter(
272+
task ->
273+
TaskUtils.removeIterationFromTaskRefName(
274+
task.getReferenceTaskName())
275+
.equals(taskReferenceName))
276+
.findFirst();
277+
278+
return found.map(TaskModel::toTask).orElse(null);
261279
}
262280

263281
/**

core/src/main/java/com/netflix/conductor/service/TaskService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,11 @@ SearchResult<TaskSummary> search(
250250
*/
251251
ExternalStorageLocation getExternalStorageLocation(
252252
String path, String operation, String payloadType);
253+
254+
String updateTask(
255+
String workflowId,
256+
String taskRefName,
257+
TaskResult.Status status,
258+
String workerId,
259+
Map<String, Object> output);
253260
}

core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,27 @@ public String updateTask(TaskResult taskResult) {
139139
return taskResult.getTaskId();
140140
}
141141

142+
@Override
143+
public String updateTask(
144+
String workflowId,
145+
String taskRefName,
146+
TaskResult.Status status,
147+
String workerId,
148+
Map<String, Object> output) {
149+
Task pending = getPendingTaskForWorkflow(workflowId, taskRefName);
150+
if (pending == null) {
151+
return null;
152+
}
153+
154+
TaskResult taskResult = new TaskResult(pending);
155+
taskResult.setStatus(status);
156+
taskResult.getOutputData().putAll(output);
157+
if (StringUtils.isNotBlank(workerId)) {
158+
taskResult.setWorkerId(workerId);
159+
}
160+
return updateTask(taskResult);
161+
}
162+
142163
/**
143164
* Ack Task is received.
144165
*

rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ public String updateTask(@RequestBody TaskResult taskResult) {
8383
return taskService.updateTask(taskResult);
8484
}
8585

86+
@PostMapping(value = "/{workflowId}/{taskRefName}/{status}", produces = TEXT_PLAIN_VALUE)
87+
@Operation(summary = "Update a task By Ref Name")
88+
public String updateTask(
89+
@PathVariable("workflowId") String workflowId,
90+
@PathVariable("taskRefName") String taskRefName,
91+
@PathVariable("status") TaskResult.Status status,
92+
@RequestParam(value = "workerid", required = false) String workerId,
93+
@RequestBody Map<String, Object> output) {
94+
95+
return taskService.updateTask(workflowId, taskRefName, status, workerId, output);
96+
}
97+
8698
@PostMapping("/{taskId}/log")
8799
@Operation(summary = "Log Task Execution Details")
88100
public void log(@PathVariable("taskId") String taskId, @RequestBody String log) {

0 commit comments

Comments
 (0)