Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -319,8 +320,14 @@ public TaskRun incrementIteration() {
}

public TaskRun resetAttempts() {
State.Type lastCreationState = this.state.getHistories()
.reversed()
.stream()
.filter(history -> history.getState().isCreated())
.findFirst().get()
.getState();
return this.toBuilder()
.state(new State(State.Type.CREATED, List.of(this.state.getHistories().getFirst())))
.state(new State(lastCreationState, this.state.getHistories()))
.attempts(null)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.flow.LoopUntil;
import io.kestra.plugin.core.flow.Pause;
import io.kestra.plugin.core.flow.WorkingDirectory;
import io.micronaut.context.event.ApplicationEventPublisher;
Expand Down Expand Up @@ -163,8 +164,6 @@ public Execution retryWaitFor(Execution execution, String flowableTaskRunId) {
.stream()
.map(taskRun -> {
if (taskRun.getId().equals(flowableTaskRunId)) {
// Keep only CREATED/RUNNING
// To avoid having large history
return taskRun.resetAttempts().incrementIteration();
}

Expand Down Expand Up @@ -838,7 +837,7 @@ private TaskRun mapTaskRun(
alterState = originalTaskRun.withState(newStateType).getState();
} else {
Task task = flow.findTaskByTaskId(originalTaskRun.getTaskId());
if (!task.isFlowable() || task instanceof WorkingDirectory) {
if (!task.isFlowable() || task instanceof WorkingDirectory || task instanceof LoopUntil) {
Copy link
Contributor Author

@MTarek165 MTarek165 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brian-mulier-p Before merging this PR I have opinion on the change here beyond the context of the PR, related to Flowable transition to Running directly for restart/replay except for WorkingDirectory and LoopUntil tasks here as I see if there is no specific reason for that it is better to give all flowables a RESTARTED state as it is more natural and may have benefits like we have for LoopUntil.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk, is there an impact on the resolution to remove the LoopUntil exception here? I can see why WorkingDirectory is there because if a subtask is restarted then the WorkingDirectory is also restarted because it's a special wrapper task that forces any subtask to be performed under the same worker so it means if any subtask or the WorkingDirectory task itself is restarted, the whole is restarted so it makes more sense to see the WorkingDirectory task also as restarted

Copy link
Contributor Author

@MTarek165 MTarek165 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping LoopUntil as Restarted is essential step as it is a reference for the max duration when checking reaching maximum at LoopUntil between iterations.
I was wondering if we should make all flowables being Restarted, but since we don't know, let's keep aligned with the scope of the PR only.
You can merge it now 😀

// The current task run is the reference task run, its default state will be newState
alterState = originalTaskRun.withState(newStateType).getState();
} else {
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/io/kestra/plugin/core/flow/LoopUntil.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,13 @@ private boolean reachedMaximums(RunContext runContext, Execution execution, Task
if (printLog) {logger.warn("Max iterations reached");}
return true;
}

Instant creationDate = parentTaskRun.getState().getHistories().getFirst().getDate();
Instant creationDate = parentTaskRun.getState()
.getHistories()
.reversed()
.stream()
.filter(history -> history.getState().isCreated())
.findFirst().get()
.getDate();
Optional<Duration> maxDuration = runContext.render(this.getCheckFrequency().getMaxDuration()).as(Duration.class);
if (maxDuration.isPresent()
&& creationDate != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ protected void restartFailedWithAfterExecution() throws Exception {
restartCaseTest.restartFailedWithAfterExecution();
}

@Test
@LoadFlows({"flows/valids/loop-until-restart.yaml"})
protected void restartOrReplayLoopUntil() throws Exception{
restartCaseTest.restartOrReplayLoopUntil();
}

@Test
@LoadFlows(value = {"flows/valids/trigger-flow-listener-no-inputs.yaml",
"flows/valids/trigger-flow-listener.yaml",
Expand Down
51 changes: 51 additions & 0 deletions core/src/test/java/io/kestra/core/runners/RestartCaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kestra.core.services.ExecutionService;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -283,4 +284,54 @@ public void restartFailedWithAfterExecution() throws Exception {
.map(TaskRun::getState)
.forEach(state -> assertThat(state.getCurrent()).isIn(State.Type.SUCCESS, State.Type.SKIPPED));
}


public void restartOrReplayLoopUntil() throws Exception{
Flow flow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests", "loop-until-restart").orElseThrow();

Execution firstExecution = runnerUtils.runOne(MAIN_TENANT, flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));

assertThat(firstExecution.getState().getCurrent()).isEqualTo(Type.FAILED);
// restarting case
Execution restartedExecution = executionService.restart(firstExecution, null);
assertThat(restartedExecution).isNotNull();
assertThat(restartedExecution.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExecution.getState().getCurrent()).isEqualTo(Type.RESTARTED);

Execution finalRestartedExecution = runnerUtils.restartExecution( execution -> execution.getState().isFailed(), restartedExecution);
assertThat(finalRestartedExecution.getState().getCurrent()).isEqualTo(Type.FAILED);

Optional<TaskRun> parentTaskRun1 = finalRestartedExecution.findTaskRunsByTaskId("loop_test").stream().findFirst();
assertThat(parentTaskRun1.isPresent());

State.History lastFailed1 = parentTaskRun1.get().getState().getHistories().getLast();
State.History lastRestarted1 = parentTaskRun1.get().getState().getHistories().reversed().stream()
.filter(history -> history.getState() == Type.RESTARTED).findFirst().get();
assertThat(lastRestarted1).isNotNull();
assertThat(lastRestarted1.getDate().plus(10, ChronoUnit.SECONDS).isBefore(lastFailed1.getDate()));

// replaying case
Execution replayedExecution = executionService.replay(firstExecution, firstExecution.findTaskRunByTaskIdAndValue("loop_test", List.of()).getId(), null);
assertThat(replayedExecution.getState().getCurrent()).isEqualTo(Type.RESTARTED);
assertThat(replayedExecution.getId()).isNotEqualTo(firstExecution.getId());

Execution finalReplayedExecution = runnerUtils.awaitChildExecution(
flow,
firstExecution,
replayedExecution,
Duration.ofSeconds(60)
);
assertThat(finalReplayedExecution.getState().getCurrent()).isEqualTo(Type.FAILED);

Optional<TaskRun> parentTaskRun2 = finalReplayedExecution.findTaskRunsByTaskId("loop_test").stream().findFirst();
assertThat(parentTaskRun2.isPresent());

State.History lastFailed2 = parentTaskRun2.get().getState().getHistories().getLast();
State.History lastRestarted2 = parentTaskRun2.get().getState().getHistories().reversed().stream()
.filter(history -> history.getState() == Type.RESTARTED).findFirst().get();
assertThat(lastRestarted2).isNotNull();
assertThat(lastRestarted2.getDate().plus(10, ChronoUnit.SECONDS).isBefore(lastFailed2.getDate()));

}

}
15 changes: 15 additions & 0 deletions core/src/test/resources/flows/valids/loop-until-restart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: loop-until-restart
namespace: io.kestra.tests

tasks:
- id: loop_test
type: io.kestra.plugin.core.flow.LoopUntil
condition: "{{ false }}"
failOnMaxReached: true
checkFrequency:
interval: PT1S
maxDuration: PT10S
tasks:
- id: return_test
type: io.kestra.core.tasks.debugs.Return
format: "{{outputs.loop_test.iterationCount}}"
Loading