Skip to content

Commit 849a872

Browse files
fix(tests): wait for KILL in TestExecution cleaner
1 parent 94fa411 commit 849a872

File tree

2 files changed

+80
-43
lines changed

2 files changed

+80
-43
lines changed

tests/src/main/java/io/kestra/core/junit/extensions/KestraTestExtension.java

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,20 @@
1010
import io.kestra.core.queues.TestQueueFactory;
1111
import io.kestra.core.repositories.ExecutionRepositoryInterface;
1212
import io.kestra.core.runners.TestRunner;
13+
import io.kestra.core.utils.Await;
1314
import io.kestra.core.utils.ListUtils;
1415
import io.kestra.core.utils.TestsUtils;
1516
import io.micronaut.inject.qualifiers.Qualifiers;
1617
import io.micronaut.test.annotation.MicronautTestValue;
18+
import io.micronaut.test.context.TestContext;
1719
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
1820
import lombok.extern.slf4j.Slf4j;
1921
import org.junit.jupiter.api.extension.ExtensionContext;
2022
import org.junit.platform.commons.support.AnnotationSupport;
2123

22-
import java.util.Collections;
23-
import java.util.ConcurrentModificationException;
24-
import java.util.List;
25-
import java.util.Optional;
24+
import java.time.Duration;
25+
import java.util.*;
26+
import java.util.concurrent.TimeoutException;
2627

2728
@Slf4j
2829
public class KestraTestExtension extends MicronautJunit5Extension {
@@ -75,58 +76,96 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
7576
}
7677
}
7778

79+
@Override
80+
public void beforeTestExecution(ExtensionContext context) throws Exception {
81+
super.beforeTestExecution(context);
82+
83+
log.error("THREAD NAME IN BEFOREEACH: {}", Thread.currentThread().getName());
84+
TestQueueFactory.testExecutions.set(new ArrayList<>());
85+
}
86+
7887
@Override
7988
public void afterTestExecution(ExtensionContext context) throws Exception {
8089
super.afterTestExecution(context);
8190

8291
TestsUtils.queueConsumersCleanup();
8392

84-
KestraTest kestraTest = context.getTestClass()
85-
.orElseThrow()
86-
.getAnnotation(KestraTest.class);
87-
Optional<TestQueueFactory> testQueueFactory = Optional.of(applicationContext.containsBean(TestQueueFactory.class)).flatMap(contains -> contains ? Optional.of(applicationContext.getBean(TestQueueFactory.class)) : Optional.empty());
88-
List<Execution> testExecutions = testQueueFactory.map(TestQueueFactory::getTestExecutions).orElse(Collections.emptyList());
89-
if (!testExecutions.isEmpty()
93+
log.error("THREAD NAME IN AFTEREACH: {}", Thread.currentThread().getName());
94+
List<Execution> executionsToKill = TestQueueFactory.testExecutions.get();
95+
if (!executionsToKill.isEmpty()
9096
&& applicationContext.containsBean(ExecutionRepositoryInterface.class)
9197
&& applicationContext.containsBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED))) {
9298
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
9399
QueueInterface<ExecutionKilled> killQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
94100

95-
retryingExecutionKill(testExecutions, executionRepository, killQueue, 10);
101+
KestraTest kestraTest = context.getTestClass()
102+
.orElseThrow()
103+
.getAnnotation(KestraTest.class);
104+
// We only wait for KILLED state if the runner is started, otherwise we just emit the kill event and it may be processed upon starting a test with a runner
105+
List<Execution> killedExecutions = retryingExecutionKill(executionsToKill, executionRepository, killQueue, 10, kestraTest.startRunner());
96106

97-
testExecutions.clear();
107+
executionsToKill.removeIf(execution -> killedExecutions.stream().anyMatch(killedExecution ->
108+
Objects.equals(execution.getTenantId(), killedExecution.getTenantId())
109+
&& Objects.equals(execution.getId(), killedExecution.getId())
110+
));
98111
}
99112
}
100113

101114

102-
private void retryingExecutionKill(List<Execution> testExecutions, ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft) throws InterruptedException {
115+
private List<Execution> retryingExecutionKill(List<Execution> testExecutions, ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft, boolean shouldWaitForKill) throws InterruptedException {
103116
try {
104-
ListUtils.distinctByKey(
105-
testExecutions.stream().flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream()).toList(),
106-
Execution::getId
107-
).stream().filter(inRepository -> inRepository.getState().isRunning() || inRepository.getState().isPaused() || inRepository.getState().isQueued())
108-
.forEach(inRepository -> {
109-
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
110-
try {
111-
killQueue.emit(ExecutionKilledExecution.builder()
112-
.tenantId(inRepository.getTenantId())
113-
.executionId(inRepository.getId())
114-
.state(ExecutionKilled.State.REQUESTED)
115-
.isOnKillCascade(true)
116-
.build()
117-
);
118-
} catch (QueueException e) {
119-
log.warn("Couldn't kill execution {} after test execution", inRepository.getId(), e);
120-
}
121-
});
117+
List<Execution> runningExecutions = ListUtils.distinctByKey(
118+
testExecutions.stream().flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream()).toList(),
119+
Execution::getId
120+
).stream().filter(inRepository -> !inRepository.getState().isTerminated()).toList();
121+
122+
runningExecutions.forEach(inRepository -> emitKillMessage(killQueue, inRepository));
123+
124+
if (shouldWaitForKill) {
125+
try {
126+
waitForKilled(executionRepository, runningExecutions);
127+
} catch (TimeoutException e) {
128+
log.warn("Some executions remained in KILLING", e);
129+
}
130+
}
131+
return runningExecutions;
122132
} catch (ConcurrentModificationException e) {
123133
// We intentionally don't use a CopyOnWriteArrayList to retry on concurrent modification exceptions to make sure to get rid of flakiness due to overflowing executions
124134
if (retriesLeft <= 0) {
125135
log.warn("Couldn't kill executions after test execution, due to concurrent modifications, this could impact further tests", e);
126-
return;
136+
return Collections.emptyList();
127137
}
128138
Thread.sleep(100);
129-
retryingExecutionKill(testExecutions, executionRepository, killQueue, retriesLeft - 1);
139+
return retryingExecutionKill(testExecutions, executionRepository, killQueue, retriesLeft - 1, shouldWaitForKill);
130140
}
131141
}
142+
143+
private void emitKillMessage(QueueInterface<ExecutionKilled> killQueue, Execution inRepository) {
144+
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
145+
try {
146+
killQueue.emit(ExecutionKilledExecution.builder()
147+
.tenantId(inRepository.getTenantId())
148+
.executionId(inRepository.getId())
149+
.state(ExecutionKilled.State.REQUESTED)
150+
.isOnKillCascade(true)
151+
.build()
152+
);
153+
} catch (QueueException e) {
154+
log.warn("Couldn't kill execution {} after test execution", inRepository.getId(), e);
155+
}
156+
}
157+
158+
private void waitForKilled(ExecutionRepositoryInterface executionRepository, List<Execution> runningExecutions) throws TimeoutException {
159+
Await.until(() -> runningExecutions.stream()
160+
.map(execution -> executionRepository.findById(execution.getTenantId(), execution.getId()))
161+
.allMatch(maybeExecution -> maybeExecution.map(inRepository -> {
162+
boolean terminated = inRepository.getState().isTerminated();
163+
if (!terminated) {
164+
log.warn("Execution {} has a pending KILL request but is still in state {} ", inRepository.getId(), inRepository.getState().getCurrent());
165+
}
166+
return terminated;
167+
})
168+
.orElse(true))
169+
, Duration.ofMillis(50), Duration.ofSeconds(10));
170+
}
132171
}

tests/src/main/java/io/kestra/core/queues/TestQueueFactory.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,18 @@
22

33
import io.kestra.core.models.executions.Execution;
44
import io.micronaut.context.annotation.*;
5-
import io.micronaut.core.annotation.Introspected;
65
import jakarta.inject.Named;
76
import jakarta.inject.Singleton;
8-
import lombok.Getter;
9-
import lombok.Setter;
107

118
import java.lang.reflect.Proxy;
12-
import java.util.ArrayList;
13-
import java.util.Arrays;
14-
import java.util.List;
15-
import java.util.Optional;
9+
import java.util.*;
1610

1711
@Factory
1812
@Requires(bean = QueueFactoryInterface.class)
1913
public class TestQueueFactory {
14+
public static final InheritableThreadLocal<List<Execution>> testExecutions = new InheritableThreadLocal<>();
15+
2016
private QueueInterface<Execution> delegate;
21-
@Getter
22-
private List<Execution> testExecutions = new ArrayList<>();
2317

2418
public TestQueueFactory(QueueFactoryInterface queueFactoryInterface) {
2519
this.delegate = queueFactoryInterface.execution();
@@ -34,7 +28,11 @@ public QueueInterface<Execution> execution() {
3428
return (QueueInterface<Execution>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{QueueInterface.class}, (proxy, method, args) -> {
3529
try {
3630
if (method.getName().contains("emit")) {
37-
Arrays.stream(args).filter(arg -> arg instanceof Execution).forEach(arg -> testExecutions.add((Execution) arg));
31+
Arrays.stream(args).filter(arg -> arg instanceof Execution).forEach(arg -> {
32+
synchronized (testExecutions.get()) {
33+
testExecutions.get().add((Execution) arg);
34+
}
35+
});
3836
}
3937
return method.invoke(this.delegate, args);
4038
} catch (Exception e) {

0 commit comments

Comments
 (0)