Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 1 addition & 130 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.SoftDeletable;
Expand All @@ -27,7 +28,6 @@
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
Expand Down Expand Up @@ -975,106 +975,6 @@ public static ILoggingEvent loggingEventFromException(Throwable e) {
return loggingEvent;
}

public Map<String, Object> outputs() {
if (this.taskRunList == null) {
return ImmutableMap.of();
}

// we pre-compute the map of taskrun by id to avoid traversing the list of all taskrun for each taskrun
Map<String, TaskRun> byIds = this.taskRunList.stream().collect(Collectors.toMap(
taskRun -> taskRun.getId(),
taskRun -> taskRun
));

Map<String, Object> result = new HashMap<>();
this.taskRunList.stream()
.filter(taskRun -> taskRun.getOutputs() != null)
.collect(Collectors.groupingBy(taskRun -> taskRun.getTaskId()))
.forEach((taskId, taskRuns) -> {
Map<String, Object> taskOutputs = new HashMap<>();
for (TaskRun current : taskRuns) {
if (!MapUtils.isEmpty(current.getOutputs())) {
if (current.getIteration() != null) {
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
// If one of two of the map is null in the merge() method, we just return the other
// And if the not null map is a Variables (= read only), we cast it back to a simple
// hashmap to avoid taskOutputs becoming read-only
// i.e this happen in nested loopUntil tasks
if (merged instanceof Variables) {
merged = new HashMap<>(merged);
}
taskOutputs = merged;
} else {
taskOutputs.putAll(outputs(current, byIds));
}
}
}
result.put(taskId, taskOutputs);
});

return result;
}

private Map<String, Object> outputs(TaskRun taskRun, Map<String, TaskRun> byIds) {
List<TaskRun> parents = findParents(taskRun, byIds)
.stream()
.filter(r -> r.getValue() != null)
.toList();

if (parents.isEmpty()) {
if (taskRun.getValue() == null) {
return taskRun.getOutputs();
} else {
return Map.of(taskRun.getValue(), taskRun.getOutputs());
}
}

Map<String, Object> result = HashMap.newHashMap(1);
Map<String, Object> current = result;

for (TaskRun t : parents) {
HashMap<String, Object> item = HashMap.newHashMap(1);
current.put(t.getValue(), item);
current = item;
}

if (taskRun.getOutputs() != null) {
if (taskRun.getValue() != null) {
current.put(taskRun.getValue(), taskRun.getOutputs());
} else {
current.putAll(taskRun.getOutputs());
}
}

return result;
}


public List<Map<String, Object>> parents(TaskRun taskRun) {
List<Map<String, Object>> result = new ArrayList<>();

List<TaskRun> parents = findParents(taskRun);
Collections.reverse(parents);

for (TaskRun childTaskRun : parents) {
Map<String, Object> current = HashMap.newHashMap(2);

if (childTaskRun.getValue() != null) {
current.put("taskrun", Map.of("value", childTaskRun.getValue()));
}

if (childTaskRun.getOutputs() != null && !childTaskRun.getOutputs().isEmpty()) {
current.put("outputs", childTaskRun.getOutputs());
}

if (!current.isEmpty()) {
result.add(current);
}
}

return result;
}

/**
* Find all parents from this {@link TaskRun}. The list is starting from deeper parent and end
* on the closest parent, so the first element is the task that starts first. This method
Expand Down Expand Up @@ -1110,35 +1010,6 @@ public List<TaskRun> findParents(TaskRun taskRun) {
return result;
}

/**
* Find all parents from this {@link TaskRun}. This method does the same as #findParents(TaskRun
* taskRun) but for performance reason, as it's called a lot, we pre-compute the map of taskrun
* by ID and use it here.
*/
private List<TaskRun> findParents(TaskRun taskRun, Map<String, TaskRun> taskRunById) {
if (taskRun.getParentTaskRunId() == null || taskRunById.isEmpty()) {
return Collections.emptyList();
}

List<TaskRun> result = new ArrayList<>();
boolean ended = false;
while (!ended) {
final TaskRun finalTaskRun = taskRun;
TaskRun find = taskRunById.get(finalTaskRun.getParentTaskRunId());

if (find != null) {
result.add(find);
taskRun = find;
} else {
ended = true;
}
}

Collections.reverse(result);

return result;
}

/**
* Find all children of this {@link TaskRun}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.kestra.core.models.executions;

import io.kestra.core.models.HasUID;

public record TaskOutput(String taskRunId, String tenantId, String executionId, byte[] value, String uri) implements HasUID {
@Override
public String uid() {
return taskRunId;
}
}
13 changes: 0 additions & 13 deletions core/src/main/java/io/kestra/core/models/executions/TaskRun.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.kestra.core.models.executions;

import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.assets.AssetsInOut;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
Expand Down Expand Up @@ -52,12 +50,6 @@ public class TaskRun implements TenantInterface {
@With
List<TaskRunAttempt> attempts;

@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;

@With
@Nullable
AssetsInOut assets;
Expand Down Expand Up @@ -86,7 +78,6 @@ public TaskRun withState(State.Type state) {
this.parentTaskRunId,
this.value,
this.attempts,
this.outputs,
this.assets,
this.state.withState(state),
this.iteration,
Expand Down Expand Up @@ -114,7 +105,6 @@ public TaskRun withStateAndAttempt(State.Type state) {
this.parentTaskRunId,
this.value,
newAttempts,
this.outputs,
this.assets,
this.state.withState(state),
this.iteration,
Expand All @@ -138,7 +128,6 @@ public TaskRun fail() {
this.parentTaskRunId,
this.value,
newAttempts,
this.outputs,
this.assets,
this.state.withState(State.Type.FAILED),
this.iteration,
Expand All @@ -158,7 +147,6 @@ public TaskRun forChildExecution(Map<String, String> remapTaskRunId, String exec
.parentTaskRunId(this.getParentTaskRunId() != null ? remapTaskRunId.get(this.getParentTaskRunId()) : null)
.value(this.getValue())
.attempts(this.getAttempts())
.outputs(this.getOutputs())
.assets(this.getAssets())
.state(state == null ? this.getState() : state)
.iteration(this.getIteration())
Expand Down Expand Up @@ -241,7 +229,6 @@ public String toString(boolean pretty) {
", value=" + this.getValue() +
", parentTaskRunId=" + this.getParentTaskRunId() +
", state=" + this.getState().getCurrent().toString() +
", outputs=" + this.getOutputs() +
", assets=" + this.getAssets() +
", attempts=" + this.getAttempts() +
")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.FlowMetaStoreInterface;
Expand All @@ -14,6 +13,7 @@
import lombok.Getter;

import java.util.List;
import java.util.Map;
import java.util.Optional;

@Getter
Expand Down Expand Up @@ -53,8 +53,8 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
}

@Override
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, FlowInterface flow, Execution execution) {
return subflowTask.createSubflowExecutionResult(runContext, taskRun, flow, execution);
public Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext, TaskRun taskRun, FlowInterface flow, Execution execution, Map<String, Object> outputs) {
return subflowTask.createSubflowExecutionResult(runContext, taskRun, flow, execution, outputs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.runners.FlowMetaStoreInterface;
Expand All @@ -12,6 +11,7 @@
import io.kestra.core.runners.SubflowExecutionResult;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -33,7 +33,8 @@ List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
Optional<SubflowExecutionResult> createSubflowExecutionResult(RunContext runContext,
TaskRun taskRun,
FlowInterface flow,
Execution execution);
Execution execution,
Map<String, Object> outputs);

/**
* Whether to wait for the execution(s) of the subflow before terminating this tasks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.kestra.core.repositories;

import io.kestra.core.models.executions.TaskOutput;

import java.util.List;
import java.util.Optional;

public interface TaskOutputRepositoryInterface {
Optional<TaskOutput> findById(String tenantId, String taskRunId);

TaskOutput save(TaskOutput taskOutput);

List<TaskOutput> findByExecution(String tenantId, String executionId);
}
35 changes: 29 additions & 6 deletions core/src/main/java/io/kestra/core/runners/DefaultRunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.kestra.core.utils.VersionProvider;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Introspected;
Expand Down Expand Up @@ -524,13 +525,15 @@ public void cleanup() {
public TaskRunInfo taskRunInfo() {
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
.map(Map.class::cast);
Optional<Map<String, Object>> maybeTaskMap = Optional.ofNullable(this.getVariables().get("task"))
.map(Map.class::cast);
Optional<Map<String, Object>> maybeExecutionMap = Optional.ofNullable(this.getVariables().get("execution"))
.map(Map.class::cast);
return new TaskRunInfo(
(String) this.getVariables().get("executionId"),
(String) this.getVariables().get("taskId"),
maybeTaskRunMap.map(m -> (String) m.get("id"))
.orElse(null),
maybeTaskRunMap.map(m -> (String) m.get("value"))
.orElse(null)
maybeExecutionMap.map(m -> (String) m.get("id")).orElse(null),
maybeTaskMap.map(m -> (String) m.get("id")).orElse(null),
maybeTaskRunMap.map(m -> (String) m.get("id")).orElse(null),
maybeTaskRunMap.map(m -> (String) m.get("value")).orElse(null)
);
}

Expand Down Expand Up @@ -614,6 +617,26 @@ public SDK sdk() {
return this.sdk;
}

@SuppressWarnings("unchecked")
@Override
public Map<String, Object> currentOutput() {
Map<?, ?> allOutputs = (Map<?, ?>) variables.get("outputs");
Map<?, ?> outputs = (Map<?, ?>) allOutputs.get(taskRunInfo().taskId());
List<Map<?, ?>> parents = (List<Map<?, ?>>) variables.get("parents");
if (!ListUtils.isEmpty(parents) && !MapUtils.isEmpty(outputs)) {
Collections.reverse(parents);
for (Map<?, ?> parent : parents) {
Map<?, ?> taskrun = (Map<?, ?>) parent.get("taskrun");
if (taskrun != null) {
outputs = (Map<?, ?>) outputs.get(taskrun.get("value"));
}
}
}
Map<?, ?> taskrun = (Map<?, ?>) variables.get("taskrun");

return (Map<String, Object>) outputs.get(taskrun.get("value"));
}

/**
* Get access to Kestra internal services.
* WARNING: this should only be used for very specific needs, plugins should try to avoid using an Kestra internal service.
Expand Down
Loading
Loading