Skip to content

Commit a74f4be

Browse files
feat(tests): intercept created executions through queue proxy & kill them if running after test
1 parent d167934 commit a74f4be

File tree

6 files changed

+138
-13
lines changed

6 files changed

+138
-13
lines changed

core/src/main/java/io/kestra/core/queues/QueueInterface.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.kestra.core.exceptions.DeserializationException;
44
import io.kestra.core.models.Pauseable;
55
import io.kestra.core.utils.Either;
6+
import org.apache.commons.lang3.NotImplementedException;
67

78
import java.io.Closeable;
89
import java.util.List;
@@ -54,4 +55,20 @@ default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Eith
5455
}
5556

5657
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
58+
59+
default void deleteByKey(String key) throws QueueException {
60+
throw new NotImplementedException();
61+
}
62+
63+
default void deleteByKeys(List<String> keys) throws QueueException {
64+
throw new NotImplementedException();
65+
}
66+
67+
default void emitOnly(String consumerGroup, T message) throws QueueException {
68+
throw new NotImplementedException();
69+
}
70+
71+
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
72+
throw new NotImplementedException();
73+
}
5774
}

jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public void run() {
293293

294294
Await.until(() -> this.allFlows != null, Duration.ofMillis(100), Duration.ofMinutes(5));
295295

296-
this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
296+
this.receiveCancellations.addFirst(this.executionQueue.receiveBatch(
297297
Executor.class,
298298
executions -> {
299299
List<CompletableFuture<Void>> futures = executions.stream()
@@ -302,7 +302,7 @@ public void run() {
302302
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
303303
}
304304
));
305-
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
305+
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
306306
Executor.class,
307307
workerTaskResults -> {
308308
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
@@ -1121,14 +1121,14 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
11211121
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
11221122
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
11231123
if (cleanExecutionQueue && isTerminated) {
1124-
((JdbcQueue<Execution>) executionQueue).deleteByKey(executor.getExecution().getId());
1124+
executionQueue.deleteByKey(executor.getExecution().getId());
11251125
}
11261126

11271127
// emit for other consumers than the executor if no failure
11281128
if (hasFailure) {
11291129
this.executionQueue.emit(executor.getExecution());
11301130
} else {
1131-
((JdbcQueue<Execution>) this.executionQueue).emitOnly(null, executor.getExecution());
1131+
this.executionQueue.emitOnly(null, executor.getExecution());
11321132
}
11331133

11341134
Execution execution = executor.getExecution();
@@ -1206,8 +1206,8 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
12061206
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
12071207
.map(taskRun -> taskRun.getId())
12081208
.toList();
1209-
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
1210-
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
1209+
workerTaskResultQueue.deleteByKeys(taskRunKeys);
1210+
workerJobQueue.deleteByKeys(taskRunKeys);
12111211
}
12121212
}
12131213
} catch (QueueException e) {

jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@
3939
@JdbcRunnerEnabled
4040
public class JdbcIndexer implements Indexer {
4141
private final LogRepositoryInterface logRepository;
42-
private final JdbcQueue<LogEntry> logQueue;
42+
private final QueueInterface<LogEntry> logQueue;
4343

4444
private final MetricRepositoryInterface metricRepository;
45-
private final JdbcQueue<MetricEntry> metricQueue;
45+
private final QueueInterface<MetricEntry> metricQueue;
4646
private final MetricRegistry metricRegistry;
4747
private final List<Runnable> receiveCancellations = new ArrayList<>();
4848

@@ -67,9 +67,9 @@ public JdbcIndexer(
6767
QueueService queueService
6868
) {
6969
this.logRepository = logRepository;
70-
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
70+
this.logQueue = logQueue;
7171
this.metricRepository = metricRepositor;
72-
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
72+
this.metricQueue = metricQueue;
7373
this.metricRegistry = metricRegistry;
7474
this.eventPublisher = eventPublisher;
7575
this.skipExecutionService = skipExecutionService;
@@ -91,7 +91,7 @@ protected void startQueues() {
9191
this.sendBatch(metricQueue, metricRepository);
9292
}
9393

94-
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
94+
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
9595
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
9696
// first, log all deserialization issues
9797
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));

tests/src/main/java/io/kestra/core/junit/extensions/KestraTestExtension.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
package io.kestra.core.junit.extensions;
22

33
import io.kestra.core.junit.annotations.KestraTest;
4+
import io.kestra.core.models.executions.ExecutionKilled;
5+
import io.kestra.core.models.executions.ExecutionKilledExecution;
6+
import io.kestra.core.queues.QueueException;
7+
import io.kestra.core.queues.QueueFactoryInterface;
8+
import io.kestra.core.queues.QueueInterface;
9+
import io.kestra.core.repositories.ExecutionRepositoryInterface;
410
import io.kestra.core.runners.TestRunner;
511
import io.kestra.core.utils.TestsUtils;
12+
import io.micronaut.inject.qualifiers.Qualifiers;
613
import io.micronaut.test.annotation.MicronautTestValue;
714
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
15+
import lombok.extern.slf4j.Slf4j;
816
import org.junit.jupiter.api.extension.ExtensionContext;
917
import org.junit.platform.commons.support.AnnotationSupport;
1018

19+
import java.util.ConcurrentModificationException;
20+
21+
@Slf4j
1122
public class KestraTestExtension extends MicronautJunit5Extension {
1223
private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(KestraTestExtension.class);
1324

@@ -47,9 +58,9 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
4758
KestraTest kestraTest = extensionContext.getTestClass()
4859
.orElseThrow()
4960
.getAnnotation(KestraTest.class);
50-
if (kestraTest.startRunner()){
61+
if (kestraTest.startRunner()) {
5162
TestRunner runner = applicationContext.getBean(TestRunner.class);
52-
if (!runner.isRunning()){
63+
if (!runner.isRunning()) {
5364
runner.setSchedulerEnabled(kestraTest.startScheduler());
5465
runner.setWorkerEnabled(kestraTest.startWorker());
5566
runner.run();
@@ -62,5 +73,49 @@ public void afterTestExecution(ExtensionContext context) throws Exception {
6273
super.afterTestExecution(context);
6374

6475
TestsUtils.queueConsumersCleanup();
76+
77+
KestraTest kestraTest = context.getTestClass()
78+
.orElseThrow()
79+
.getAnnotation(KestraTest.class);
80+
if (kestraTest.startRunner()
81+
&& applicationContext.containsBean(ExecutionRepositoryInterface.class)
82+
&& applicationContext.containsBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED))) {
83+
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
84+
QueueInterface<ExecutionKilled> killQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
85+
86+
retryingExecutionKill(executionRepository, killQueue, 10);
87+
88+
TestRunner.testExecutions.get().clear();
89+
}
90+
}
91+
92+
private void retryingExecutionKill(ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft) throws InterruptedException {
93+
try {
94+
TestRunner.testExecutions.get().stream()
95+
.flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream())
96+
.filter(inRepository -> inRepository.getState().isRunning())
97+
.forEach(inRepository -> {
98+
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
99+
try {
100+
killQueue.emit(ExecutionKilledExecution.builder()
101+
.tenantId(inRepository.getTenantId())
102+
.executionId(inRepository.getId())
103+
.state(ExecutionKilled.State.REQUESTED)
104+
.isOnKillCascade(true)
105+
.build()
106+
);
107+
} catch (QueueException e) {
108+
log.warn("Couldn't kill execution {} after test execution", inRepository.getId(), e);
109+
}
110+
});
111+
} catch (ConcurrentModificationException e) {
112+
// We intentionally don't use a CopyOnWriteArrayList to retry on concurrent modification exceptions to make sure to get rid of flakiness due to overflowing executions
113+
if (retriesLeft <= 0) {
114+
log.warn("Couldn't kill executions after test execution, due to concurrent modifications, this could impact further tests", e);
115+
return;
116+
}
117+
Thread.sleep(100);
118+
retryingExecutionKill(executionRepository, killQueue, retriesLeft - 1);
119+
}
65120
}
66121
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.kestra.core.queues;
2+
3+
import io.kestra.core.models.executions.Execution;
4+
import io.micronaut.context.annotation.Bean;
5+
import io.micronaut.context.annotation.Factory;
6+
import io.micronaut.context.annotation.Replaces;
7+
import jakarta.inject.Named;
8+
import jakarta.inject.Singleton;
9+
10+
import java.lang.reflect.Proxy;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.Optional;
14+
15+
@Factory
16+
public class TestQueueFactory {
17+
private QueueInterface<Execution> delegate;
18+
private List<Execution> testExecutions;
19+
20+
public TestQueueFactory(QueueFactoryInterface queueFactoryInterface) {
21+
this.delegate = queueFactoryInterface.execution();
22+
}
23+
24+
public void setTestExecutionsList(List<Execution> testExecutions) {
25+
this.testExecutions = testExecutions;
26+
}
27+
28+
@Singleton
29+
@Replaces(named = QueueFactoryInterface.EXECUTION_NAMED)
30+
@Named(QueueFactoryInterface.EXECUTION_NAMED)
31+
@Bean(preDestroy = "close")
32+
public QueueInterface<Execution> execution() {
33+
return (QueueInterface<Execution>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{QueueInterface.class}, (proxy, method, args) -> {
34+
try {
35+
if (method.getName().contains("emit")) {
36+
Arrays.stream(args).filter(arg -> arg instanceof Execution).forEach(arg -> testExecutions.add((Execution) arg));
37+
}
38+
return method.invoke(this.delegate, args);
39+
} catch (Exception e) {
40+
throw Optional.ofNullable(e.getCause()).orElse(e);
41+
}
42+
});
43+
}
44+
}

tests/src/main/java/io/kestra/core/runners/TestRunner.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package io.kestra.core.runners;
22

3+
import io.kestra.core.models.executions.Execution;
4+
import io.kestra.core.queues.TestQueueFactory;
5+
import io.kestra.core.queues.QueueFactoryInterface;
36
import io.kestra.core.server.Service;
47
import io.kestra.core.utils.Await;
58
import io.kestra.core.utils.ExecutorsUtils;
69
import io.kestra.worker.DefaultWorker;
710
import io.micronaut.context.ApplicationContext;
811
import io.micronaut.context.annotation.Value;
12+
import io.micronaut.inject.qualifiers.Qualifiers;
913
import jakarta.annotation.PreDestroy;
1014
import jakarta.inject.Inject;
1115
import jakarta.inject.Singleton;
@@ -25,6 +29,8 @@
2529
@Slf4j
2630
@Singleton
2731
public class TestRunner implements Runnable, AutoCloseable {
32+
public static final ThreadLocal<List<Execution>> testExecutions = ThreadLocal.withInitial(ArrayList::new);
33+
2834
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()) * 16;
2935
@Setter private boolean schedulerEnabled = true;
3036
@Setter private boolean workerEnabled = true;
@@ -46,6 +52,9 @@ public class TestRunner implements Runnable, AutoCloseable {
4652

4753
@Override
4854
public void run() {
55+
TestQueueFactory executionQueue = applicationContext.getBean(TestQueueFactory.class);
56+
executionQueue.setTestExecutionsList(testExecutions.get());
57+
4958
running.set(true);
5059

5160
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");

0 commit comments

Comments
 (0)