Skip to content

Commit 75642f7

Browse files
committed
feat(system): store task run outputs out of the execution
1 parent 23e9fb5 commit 75642f7

File tree

71 files changed

+1034
-531
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1034
-531
lines changed

core/src/main/java/io/kestra/core/models/executions/Execution.java

Lines changed: 1 addition & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
99
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
1010
import com.google.common.collect.ImmutableMap;
11+
import com.google.common.collect.Streams;
1112
import io.kestra.core.debug.Breakpoint;
1213
import io.kestra.core.exceptions.InternalException;
1314
import io.kestra.core.models.SoftDeletable;
@@ -27,7 +28,6 @@
2728
import io.kestra.core.test.flow.TaskFixture;
2829
import io.kestra.core.utils.IdUtils;
2930
import io.kestra.core.utils.ListUtils;
30-
import io.kestra.core.utils.MapUtils;
3131
import io.swagger.v3.oas.annotations.Hidden;
3232
import io.swagger.v3.oas.annotations.media.Schema;
3333
import jakarta.annotation.Nullable;
@@ -975,106 +975,6 @@ public static ILoggingEvent loggingEventFromException(Throwable e) {
975975
return loggingEvent;
976976
}
977977

978-
public Map<String, Object> outputs() {
979-
if (this.taskRunList == null) {
980-
return ImmutableMap.of();
981-
}
982-
983-
// we pre-compute the map of taskrun by id to avoid traversing the list of all taskrun for each taskrun
984-
Map<String, TaskRun> byIds = this.taskRunList.stream().collect(Collectors.toMap(
985-
taskRun -> taskRun.getId(),
986-
taskRun -> taskRun
987-
));
988-
989-
Map<String, Object> result = new HashMap<>();
990-
this.taskRunList.stream()
991-
.filter(taskRun -> taskRun.getOutputs() != null)
992-
.collect(Collectors.groupingBy(taskRun -> taskRun.getTaskId()))
993-
.forEach((taskId, taskRuns) -> {
994-
Map<String, Object> taskOutputs = new HashMap<>();
995-
for (TaskRun current : taskRuns) {
996-
if (!MapUtils.isEmpty(current.getOutputs())) {
997-
if (current.getIteration() != null) {
998-
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
999-
// If one of two of the map is null in the merge() method, we just return the other
1000-
// And if the not null map is a Variables (= read only), we cast it back to a simple
1001-
// hashmap to avoid taskOutputs becoming read-only
1002-
// i.e this happen in nested loopUntil tasks
1003-
if (merged instanceof Variables) {
1004-
merged = new HashMap<>(merged);
1005-
}
1006-
taskOutputs = merged;
1007-
} else {
1008-
taskOutputs.putAll(outputs(current, byIds));
1009-
}
1010-
}
1011-
}
1012-
result.put(taskId, taskOutputs);
1013-
});
1014-
1015-
return result;
1016-
}
1017-
1018-
private Map<String, Object> outputs(TaskRun taskRun, Map<String, TaskRun> byIds) {
1019-
List<TaskRun> parents = findParents(taskRun, byIds)
1020-
.stream()
1021-
.filter(r -> r.getValue() != null)
1022-
.toList();
1023-
1024-
if (parents.isEmpty()) {
1025-
if (taskRun.getValue() == null) {
1026-
return taskRun.getOutputs();
1027-
} else {
1028-
return Map.of(taskRun.getValue(), taskRun.getOutputs());
1029-
}
1030-
}
1031-
1032-
Map<String, Object> result = HashMap.newHashMap(1);
1033-
Map<String, Object> current = result;
1034-
1035-
for (TaskRun t : parents) {
1036-
HashMap<String, Object> item = HashMap.newHashMap(1);
1037-
current.put(t.getValue(), item);
1038-
current = item;
1039-
}
1040-
1041-
if (taskRun.getOutputs() != null) {
1042-
if (taskRun.getValue() != null) {
1043-
current.put(taskRun.getValue(), taskRun.getOutputs());
1044-
} else {
1045-
current.putAll(taskRun.getOutputs());
1046-
}
1047-
}
1048-
1049-
return result;
1050-
}
1051-
1052-
1053-
public List<Map<String, Object>> parents(TaskRun taskRun) {
1054-
List<Map<String, Object>> result = new ArrayList<>();
1055-
1056-
List<TaskRun> parents = findParents(taskRun);
1057-
Collections.reverse(parents);
1058-
1059-
for (TaskRun childTaskRun : parents) {
1060-
Map<String, Object> current = HashMap.newHashMap(2);
1061-
1062-
if (childTaskRun.getValue() != null) {
1063-
current.put("taskrun", Map.of("value", childTaskRun.getValue()));
1064-
}
1065-
1066-
if (childTaskRun.getOutputs() != null && !childTaskRun.getOutputs().isEmpty()) {
1067-
current.put("outputs", childTaskRun.getOutputs());
1068-
}
1069-
1070-
if (!current.isEmpty()) {
1071-
result.add(current);
1072-
}
1073-
}
1074-
1075-
return result;
1076-
}
1077-
1078978
/**
1079979
* Find all parents from this {@link TaskRun}. The list is starting from deeper parent and end
1080980
* on the closest parent, so the first element is the task that starts first. This method
@@ -1110,35 +1010,6 @@ public List<TaskRun> findParents(TaskRun taskRun) {
11101010
return result;
11111011
}
11121012

1113-
/**
1114-
* Find all parents from this {@link TaskRun}. This method does the same as #findParents(TaskRun
1115-
* taskRun) but for performance reason, as it's called a lot, we pre-compute the map of taskrun
1116-
* by ID and use it here.
1117-
*/
1118-
private List<TaskRun> findParents(TaskRun taskRun, Map<String, TaskRun> taskRunById) {
1119-
if (taskRun.getParentTaskRunId() == null || taskRunById.isEmpty()) {
1120-
return Collections.emptyList();
1121-
}
1122-
1123-
List<TaskRun> result = new ArrayList<>();
1124-
boolean ended = false;
1125-
while (!ended) {
1126-
final TaskRun finalTaskRun = taskRun;
1127-
TaskRun find = taskRunById.get(finalTaskRun.getParentTaskRunId());
1128-
1129-
if (find != null) {
1130-
result.add(find);
1131-
taskRun = find;
1132-
} else {
1133-
ended = true;
1134-
}
1135-
}
1136-
1137-
Collections.reverse(result);
1138-
1139-
return result;
1140-
}
1141-
11421013
/**
11431014
* Find all children of this {@link TaskRun}.
11441015
*/
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.kestra.core.models.executions;
2+
3+
import io.kestra.core.models.HasUID;
4+
5+
public record TaskOutput(String taskRunId, String tenantId, String executionId, byte[] value, String uri) implements HasUID {
6+
@Override
7+
public String uid() {
8+
return taskRunId;
9+
}
10+
}

core/src/main/java/io/kestra/core/models/executions/TaskRun.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package io.kestra.core.models.executions;
22

3-
import com.fasterxml.jackson.annotation.JsonInclude;
43
import io.kestra.core.models.TenantInterface;
54
import io.kestra.core.models.assets.AssetsInOut;
65
import io.kestra.core.models.flows.State;
76
import io.kestra.core.models.tasks.ResolvedTask;
87
import io.kestra.core.models.tasks.retrys.AbstractRetry;
98
import io.kestra.core.utils.IdUtils;
109
import io.swagger.v3.oas.annotations.Hidden;
11-
import io.swagger.v3.oas.annotations.media.Schema;
1210
import jakarta.annotation.Nullable;
1311
import jakarta.validation.constraints.NotNull;
1412
import jakarta.validation.constraints.Pattern;
@@ -52,12 +50,6 @@ public class TaskRun implements TenantInterface {
5250
@With
5351
List<TaskRunAttempt> attempts;
5452

55-
@With
56-
@JsonInclude(JsonInclude.Include.ALWAYS)
57-
@Nullable
58-
@Schema(implementation = Object.class)
59-
Variables outputs;
60-
6153
@With
6254
@Nullable
6355
AssetsInOut assets;
@@ -86,7 +78,6 @@ public TaskRun withState(State.Type state) {
8678
this.parentTaskRunId,
8779
this.value,
8880
this.attempts,
89-
this.outputs,
9081
this.assets,
9182
this.state.withState(state),
9283
this.iteration,
@@ -114,7 +105,6 @@ public TaskRun withStateAndAttempt(State.Type state) {
114105
this.parentTaskRunId,
115106
this.value,
116107
newAttempts,
117-
this.outputs,
118108
this.assets,
119109
this.state.withState(state),
120110
this.iteration,
@@ -138,7 +128,6 @@ public TaskRun fail() {
138128
this.parentTaskRunId,
139129
this.value,
140130
newAttempts,
141-
this.outputs,
142131
this.assets,
143132
this.state.withState(State.Type.FAILED),
144133
this.iteration,
@@ -158,7 +147,6 @@ public TaskRun forChildExecution(Map<String, String> remapTaskRunId, String exec
158147
.parentTaskRunId(this.getParentTaskRunId() != null ? remapTaskRunId.get(this.getParentTaskRunId()) : null)
159148
.value(this.getValue())
160149
.attempts(this.getAttempts())
161-
.outputs(this.getOutputs())
162150
.assets(this.getAssets())
163151
.state(state == null ? this.getState() : state)
164152
.iteration(this.getIteration())
@@ -241,7 +229,6 @@ public String toString(boolean pretty) {
241229
", value=" + this.getValue() +
242230
", parentTaskRunId=" + this.getParentTaskRunId() +
243231
", state=" + this.getState().getCurrent().toString() +
244-
", outputs=" + this.getOutputs() +
245232
", assets=" + this.getAssets() +
246233
", attempts=" + this.getAttempts() +
247234
")";

core/src/main/java/io/kestra/core/models/hierarchies/SubflowGraphTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.kestra.core.exceptions.InternalException;
55
import io.kestra.core.models.executions.Execution;
66
import io.kestra.core.models.executions.TaskRun;
7-
import io.kestra.core.models.flows.Flow;
87
import io.kestra.core.models.flows.FlowInterface;
98
import io.kestra.core.models.tasks.*;
109
import io.kestra.core.runners.FlowMetaStoreInterface;
@@ -14,6 +13,7 @@
1413
import lombok.Getter;
1514

1615
import java.util.List;
16+
import java.util.Map;
1717
import java.util.Optional;
1818

1919
@Getter
@@ -53,8 +53,8 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
5353
}
5454

5555
@Override
56-
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, FlowInterface flow, Execution execution) {
57-
return subflowTask.createSubflowExecutionResult(runContext, taskRun, flow, execution);
56+
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, FlowInterface flow, Execution execution, Map<String, Object> outputs) {
57+
return subflowTask.createSubflowExecutionResult(runContext, taskRun, flow, execution, outputs);
5858
}
5959

6060
@Override

core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.kestra.core.exceptions.InternalException;
44
import io.kestra.core.models.executions.Execution;
55
import io.kestra.core.models.executions.TaskRun;
6-
import io.kestra.core.models.flows.Flow;
76
import io.kestra.core.models.flows.FlowId;
87
import io.kestra.core.models.flows.FlowInterface;
98
import io.kestra.core.runners.FlowMetaStoreInterface;
@@ -12,6 +11,7 @@
1211
import io.kestra.core.runners.SubflowExecutionResult;
1312

1413
import java.util.List;
14+
import java.util.Map;
1515
import java.util.Optional;
1616

1717
/**
@@ -33,7 +33,8 @@ List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
3333
Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext,
3434
TaskRun taskRun,
3535
FlowInterface flow,
36-
Execution execution);
36+
Execution execution,
37+
Map<String, Object> outputs);
3738

3839
/**
3940
* Whether to wait for the execution(s) of the subflow before terminating this tasks
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.kestra.core.repositories;
2+
3+
import io.kestra.core.models.executions.TaskOutput;
4+
5+
import java.util.List;
6+
import java.util.Optional;
7+
8+
public interface TaskOutputRepositoryInterface {
9+
Optional<TaskOutput> findById(String tenantId, String taskRunId);
10+
11+
TaskOutput save(TaskOutput taskOutput);
12+
13+
List<TaskOutput> findByExecution(String tenantId, String executionId);
14+
}

core/src/main/java/io/kestra/core/runners/DefaultRunContext.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.kestra.core.storages.StorageInterface;
1919
import io.kestra.core.storages.kv.KVStore;
2020
import io.kestra.core.utils.ListUtils;
21+
import io.kestra.core.utils.MapUtils;
2122
import io.kestra.core.utils.VersionProvider;
2223
import io.micronaut.context.ApplicationContext;
2324
import io.micronaut.core.annotation.Introspected;
@@ -524,13 +525,15 @@ public void cleanup() {
524525
public TaskRunInfo taskRunInfo() {
525526
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
526527
.map(Map.class::cast);
528+
Optional<Map<String, Object>> maybeTaskMap = Optional.ofNullable(this.getVariables().get("task"))
529+
.map(Map.class::cast);
530+
Optional<Map<String, Object>> maybeExecutionMap = Optional.ofNullable(this.getVariables().get("execution"))
531+
.map(Map.class::cast);
527532
return new TaskRunInfo(
528-
(String) this.getVariables().get("executionId"),
529-
(String) this.getVariables().get("taskId"),
530-
maybeTaskRunMap.map(m -> (String) m.get("id"))
531-
.orElse(null),
532-
maybeTaskRunMap.map(m -> (String) m.get("value"))
533-
.orElse(null)
533+
maybeExecutionMap.map(m -> (String) m.get("id")).orElse(null),
534+
maybeTaskMap.map(m -> (String) m.get("id")).orElse(null),
535+
maybeTaskRunMap.map(m -> (String) m.get("id")).orElse(null),
536+
maybeTaskRunMap.map(m -> (String) m.get("value")).orElse(null)
534537
);
535538
}
536539

@@ -614,6 +617,26 @@ public SDK sdk() {
614617
return this.sdk;
615618
}
616619

620+
@SuppressWarnings("unchecked")
621+
@Override
622+
public Map<String, Object> currentOutput() {
623+
Map<?, ?> allOutputs = (Map<?, ?>) variables.get("outputs");
624+
Map<?, ?> outputs = (Map<?, ?>) allOutputs.get(taskRunInfo().taskId());
625+
List<Map<?, ?>> parents = (List<Map<?, ?>>) variables.get("parents");
626+
if (!ListUtils.isEmpty(parents) && !MapUtils.isEmpty(outputs)) {
627+
Collections.reverse(parents);
628+
for (Map<?, ?> parent : parents) {
629+
Map<?, ?> taskrun = (Map<?, ?>) parent.get("taskrun");
630+
if (taskrun != null) {
631+
outputs = (Map<?, ?>) outputs.get(taskrun.get("value"));
632+
}
633+
}
634+
}
635+
Map<?, ?> taskrun = (Map<?, ?>) variables.get("taskrun");
636+
637+
return (Map<String, Object>) outputs.get(taskrun.get("value"));
638+
}
639+
617640
/**
618641
* Get access to Kestra internal services.
619642
* WARNING: this should only be used for very specific needs, plugins should try to avoid using an Kestra internal service.

0 commit comments

Comments
 (0)