diff --git a/core/src/main/java/io/kestra/core/models/executions/TaskRun.java b/core/src/main/java/io/kestra/core/models/executions/TaskRun.java index 9b5931ff18e..5efcdfe0b29 100644 --- a/core/src/main/java/io/kestra/core/models/executions/TaskRun.java +++ b/core/src/main/java/io/kestra/core/models/executions/TaskRun.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; @ToString @EqualsAndHashCode @@ -319,8 +320,14 @@ public TaskRun incrementIteration() { } public TaskRun resetAttempts() { + State.Type lastCreationState = this.state.getHistories() + .reversed() + .stream() + .filter(history -> history.getState().isCreated()) + .findFirst().get() + .getState(); return this.toBuilder() - .state(new State(State.Type.CREATED, List.of(this.state.getHistories().getFirst()))) + .state(new State(lastCreationState, this.state.getHistories())) .attempts(null) .build(); } diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index f4c72716217..ead63c0982f 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -31,6 +31,7 @@ import io.kestra.core.utils.GraphUtils; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.ListUtils; +import io.kestra.plugin.core.flow.LoopUntil; import io.kestra.plugin.core.flow.Pause; import io.kestra.plugin.core.flow.WorkingDirectory; import io.micronaut.context.event.ApplicationEventPublisher; @@ -163,8 +164,6 @@ public Execution retryWaitFor(Execution execution, String flowableTaskRunId) { .stream() .map(taskRun -> { if (taskRun.getId().equals(flowableTaskRunId)) { - // Keep only CREATED/RUNNING - // To avoid having large history return taskRun.resetAttempts().incrementIteration(); } @@ -838,7 +837,7 @@ private TaskRun mapTaskRun( alterState = originalTaskRun.withState(newStateType).getState(); } else { Task task = flow.findTaskByTaskId(originalTaskRun.getTaskId()); - if (!task.isFlowable() || task instanceof WorkingDirectory) { + if (!task.isFlowable() || task instanceof WorkingDirectory || task instanceof LoopUntil) { // The current task run is the reference task run, its default state will be newState alterState = originalTaskRun.withState(newStateType).getState(); } else { diff --git a/core/src/main/java/io/kestra/plugin/core/flow/LoopUntil.java b/core/src/main/java/io/kestra/plugin/core/flow/LoopUntil.java index f5f799526c6..6b4f6a005f4 100644 --- a/core/src/main/java/io/kestra/plugin/core/flow/LoopUntil.java +++ b/core/src/main/java/io/kestra/plugin/core/flow/LoopUntil.java @@ -181,8 +181,13 @@ private boolean reachedMaximums(RunContext runContext, Execution execution, Task if (printLog) {logger.warn("Max iterations reached");} return true; } - - Instant creationDate = parentTaskRun.getState().getHistories().getFirst().getDate(); + Instant creationDate = parentTaskRun.getState() + .getHistories() + .reversed() + .stream() + .filter(history -> history.getState().isCreated()) + .findFirst().get() + .getDate(); Optional maxDuration = runContext.render(this.getCheckFrequency().getMaxDuration()).as(Duration.class); if (maxDuration.isPresent() && creationDate != null diff --git a/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java b/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java index ee1d997375d..966484fbfd2 100644 --- a/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java +++ b/core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java @@ -191,6 +191,12 @@ protected void restartFailedWithAfterExecution() throws Exception { restartCaseTest.restartFailedWithAfterExecution(); } + @Test + @LoadFlows({"flows/valids/loop-until-restart.yaml"}) + protected void restartOrReplayLoopUntil() throws Exception{ + restartCaseTest.restartOrReplayLoopUntil(); + } + @Test @LoadFlows(value = {"flows/valids/trigger-flow-listener-no-inputs.yaml", "flows/valids/trigger-flow-listener.yaml", diff --git a/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java b/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java index b3fe04e5874..bf5ce1ac428 100644 --- a/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java +++ b/core/src/test/java/io/kestra/core/runners/RestartCaseTest.java @@ -9,6 +9,7 @@ import io.kestra.core.services.ExecutionService; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; @@ -283,4 +284,54 @@ public void restartFailedWithAfterExecution() throws Exception { .map(TaskRun::getState) .forEach(state -> assertThat(state.getCurrent()).isIn(State.Type.SUCCESS, State.Type.SKIPPED)); } + + + public void restartOrReplayLoopUntil() throws Exception{ + Flow flow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests", "loop-until-restart").orElseThrow(); + + Execution firstExecution = runnerUtils.runOne(MAIN_TENANT, flow.getNamespace(), flow.getId(), Duration.ofSeconds(60)); + + assertThat(firstExecution.getState().getCurrent()).isEqualTo(Type.FAILED); + // restarting case + Execution restartedExecution = executionService.restart(firstExecution, null); + assertThat(restartedExecution).isNotNull(); + assertThat(restartedExecution.getId()).isEqualTo(firstExecution.getId()); + assertThat(restartedExecution.getState().getCurrent()).isEqualTo(Type.RESTARTED); + + Execution finalRestartedExecution = runnerUtils.restartExecution( execution -> execution.getState().isFailed(), restartedExecution); + assertThat(finalRestartedExecution.getState().getCurrent()).isEqualTo(Type.FAILED); + + Optional parentTaskRun1 = finalRestartedExecution.findTaskRunsByTaskId("loop_test").stream().findFirst(); + assertThat(parentTaskRun1.isPresent()); + + State.History lastFailed1 = parentTaskRun1.get().getState().getHistories().getLast(); + State.History lastRestarted1 = parentTaskRun1.get().getState().getHistories().reversed().stream() + .filter(history -> history.getState() == Type.RESTARTED).findFirst().get(); + assertThat(lastRestarted1).isNotNull(); + assertThat(lastRestarted1.getDate().plus(10, ChronoUnit.SECONDS).isBefore(lastFailed1.getDate())); + + // replaying case + Execution replayedExecution = executionService.replay(firstExecution, firstExecution.findTaskRunByTaskIdAndValue("loop_test", List.of()).getId(), null); + assertThat(replayedExecution.getState().getCurrent()).isEqualTo(Type.RESTARTED); + assertThat(replayedExecution.getId()).isNotEqualTo(firstExecution.getId()); + + Execution finalReplayedExecution = runnerUtils.awaitChildExecution( + flow, + firstExecution, + replayedExecution, + Duration.ofSeconds(60) + ); + assertThat(finalReplayedExecution.getState().getCurrent()).isEqualTo(Type.FAILED); + + Optional parentTaskRun2 = finalReplayedExecution.findTaskRunsByTaskId("loop_test").stream().findFirst(); + assertThat(parentTaskRun2.isPresent()); + + State.History lastFailed2 = parentTaskRun2.get().getState().getHistories().getLast(); + State.History lastRestarted2 = parentTaskRun2.get().getState().getHistories().reversed().stream() + .filter(history -> history.getState() == Type.RESTARTED).findFirst().get(); + assertThat(lastRestarted2).isNotNull(); + assertThat(lastRestarted2.getDate().plus(10, ChronoUnit.SECONDS).isBefore(lastFailed2.getDate())); + + } + } diff --git a/core/src/test/resources/flows/valids/loop-until-restart.yaml b/core/src/test/resources/flows/valids/loop-until-restart.yaml new file mode 100644 index 00000000000..bb922fa7999 --- /dev/null +++ b/core/src/test/resources/flows/valids/loop-until-restart.yaml @@ -0,0 +1,15 @@ +id: loop-until-restart +namespace: io.kestra.tests + +tasks: + - id: loop_test + type: io.kestra.plugin.core.flow.LoopUntil + condition: "{{ false }}" + failOnMaxReached: true + checkFrequency: + interval: PT1S + maxDuration: PT10S + tasks: + - id: return_test + type: io.kestra.core.tasks.debugs.Return + format: "{{outputs.loop_test.iterationCount}}" \ No newline at end of file