Skip to content

Commit 94fa411

Browse files
fix(tests): remove ThreadLocal + add distinct on TestExecution cleaner
1 parent 9148616 commit 94fa411

File tree

4 files changed

+53
-28
lines changed

4 files changed

+53
-28
lines changed

core/src/main/java/io/kestra/core/utils/ListUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import java.util.Map;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.function.Function;
8+
import java.util.function.Predicate;
59

610
public class ListUtils {
711
public static <T> List<T> emptyOnNull(List<T> list) {
@@ -71,4 +75,13 @@ public static List<String> convertToListString(Object object){
7175
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
7276
}
7377
}
78+
79+
private static <T> Predicate<T> distinctByKeyPredicate(Function<? super T,Object> keyExtractor) {
80+
Map<Object,Boolean> seen = new ConcurrentHashMap<>();
81+
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
82+
}
83+
84+
public static <T> List<T> distinctByKey(List<T> withDuplicates, Function<? super T, Object> keyExtractor) {
85+
return withDuplicates.stream().filter(distinctByKeyPredicate(keyExtractor)).toList();
86+
}
7487
}

core/src/test/java/io/kestra/core/utils/ListUtilsTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package io.kestra.core.utils;
22

3+
import io.kestra.core.models.executions.Execution;
4+
import io.kestra.core.models.flows.State;
35
import org.junit.jupiter.api.Test;
46

7+
import java.time.Duration;
8+
import java.time.Instant;
59
import java.util.Collections;
610
import java.util.List;
711

@@ -52,4 +56,19 @@ void convertToListString(){
5256
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
5357
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
5458
}
55-
}
59+
60+
@Test
61+
void distinctByKey() {
62+
List<Execution> executions = List.of(
63+
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.CREATED, Collections.emptyList())).build(),
64+
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.RUNNING, List.of(new State.History(State.Type.CREATED, Instant.now().minus(Duration.ofSeconds(2)))))).build(),
65+
Execution.builder().id("2").flowId("flow2").build()
66+
);
67+
68+
List<Execution> distinctExecutions = ListUtils.distinctByKey(executions, Execution::getId);
69+
70+
assertThat(distinctExecutions.size()).isEqualTo(2);
71+
assertThat(distinctExecutions.stream().map(Execution::getId)).containsExactlyInAnyOrder("1", "2");
72+
assertThat(distinctExecutions.stream().filter(e -> e.getId().equals("1")).findFirst().get().getState().getCurrent()).isEqualTo(State.Type.CREATED);
73+
}
74+
}

tests/src/main/java/io/kestra/core/junit/extensions/KestraTestExtension.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,22 @@
1010
import io.kestra.core.queues.TestQueueFactory;
1111
import io.kestra.core.repositories.ExecutionRepositoryInterface;
1212
import io.kestra.core.runners.TestRunner;
13+
import io.kestra.core.utils.ListUtils;
1314
import io.kestra.core.utils.TestsUtils;
14-
import io.micronaut.core.annotation.NonNull;
15-
import io.micronaut.inject.BeanDefinition;
1615
import io.micronaut.inject.qualifiers.Qualifiers;
1716
import io.micronaut.test.annotation.MicronautTestValue;
1817
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
1918
import lombok.extern.slf4j.Slf4j;
2019
import org.junit.jupiter.api.extension.ExtensionContext;
2120
import org.junit.platform.commons.support.AnnotationSupport;
2221

23-
import java.util.ArrayList;
22+
import java.util.Collections;
2423
import java.util.ConcurrentModificationException;
2524
import java.util.List;
25+
import java.util.Optional;
2626

2727
@Slf4j
2828
public class KestraTestExtension extends MicronautJunit5Extension {
29-
public static final ThreadLocal<List<Execution>> testExecutions = ThreadLocal.withInitial(ArrayList::new);
30-
3129
private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(KestraTestExtension.class);
3230

3331
@Override
@@ -77,14 +75,6 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
7775
}
7876
}
7977

80-
@Override
81-
public void beforeTestExecution(ExtensionContext context) {
82-
if (applicationContext.containsBean(TestQueueFactory.class)) {
83-
TestQueueFactory testQueueFactory = applicationContext.getBean(TestQueueFactory.class);
84-
testQueueFactory.setTestExecutionsList(testExecutions.get());
85-
}
86-
}
87-
8878
@Override
8979
public void afterTestExecution(ExtensionContext context) throws Exception {
9080
super.afterTestExecution(context);
@@ -94,24 +84,27 @@ public void afterTestExecution(ExtensionContext context) throws Exception {
9484
KestraTest kestraTest = context.getTestClass()
9585
.orElseThrow()
9686
.getAnnotation(KestraTest.class);
97-
if (!testExecutions.get().isEmpty() &&
98-
kestraTest.startRunner()
87+
Optional<TestQueueFactory> testQueueFactory = Optional.of(applicationContext.containsBean(TestQueueFactory.class)).flatMap(contains -> contains ? Optional.of(applicationContext.getBean(TestQueueFactory.class)) : Optional.empty());
88+
List<Execution> testExecutions = testQueueFactory.map(TestQueueFactory::getTestExecutions).orElse(Collections.emptyList());
89+
if (!testExecutions.isEmpty()
9990
&& applicationContext.containsBean(ExecutionRepositoryInterface.class)
10091
&& applicationContext.containsBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED))) {
10192
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
10293
QueueInterface<ExecutionKilled> killQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
10394

104-
retryingExecutionKill(executionRepository, killQueue, 10);
95+
retryingExecutionKill(testExecutions, executionRepository, killQueue, 10);
10596

106-
testExecutions.get().clear();
97+
testExecutions.clear();
10798
}
10899
}
109100

110-
private void retryingExecutionKill(ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft) throws InterruptedException {
101+
102+
private void retryingExecutionKill(List<Execution> testExecutions, ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft) throws InterruptedException {
111103
try {
112-
testExecutions.get().stream()
113-
.flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream())
114-
.filter(inRepository -> inRepository.getState().isRunning() || inRepository.getState().isPaused() || inRepository.getState().isQueued())
104+
ListUtils.distinctByKey(
105+
testExecutions.stream().flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream()).toList(),
106+
Execution::getId
107+
).stream().filter(inRepository -> inRepository.getState().isRunning() || inRepository.getState().isPaused() || inRepository.getState().isQueued())
115108
.forEach(inRepository -> {
116109
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
117110
try {
@@ -133,7 +126,7 @@ private void retryingExecutionKill(ExecutionRepositoryInterface executionReposit
133126
return;
134127
}
135128
Thread.sleep(100);
136-
retryingExecutionKill(executionRepository, killQueue, retriesLeft - 1);
129+
retryingExecutionKill(testExecutions, executionRepository, killQueue, retriesLeft - 1);
137130
}
138131
}
139132
}

tests/src/main/java/io/kestra/core/queues/TestQueueFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
import io.micronaut.core.annotation.Introspected;
66
import jakarta.inject.Named;
77
import jakarta.inject.Singleton;
8+
import lombok.Getter;
9+
import lombok.Setter;
810

911
import java.lang.reflect.Proxy;
12+
import java.util.ArrayList;
1013
import java.util.Arrays;
1114
import java.util.List;
1215
import java.util.Optional;
@@ -15,16 +18,13 @@
1518
@Requires(bean = QueueFactoryInterface.class)
1619
public class TestQueueFactory {
1720
private QueueInterface<Execution> delegate;
18-
private List<Execution> testExecutions;
21+
@Getter
22+
private List<Execution> testExecutions = new ArrayList<>();
1923

2024
public TestQueueFactory(QueueFactoryInterface queueFactoryInterface) {
2125
this.delegate = queueFactoryInterface.execution();
2226
}
2327

24-
public void setTestExecutionsList(List<Execution> testExecutions) {
25-
this.testExecutions = testExecutions;
26-
}
27-
2828
@SuppressWarnings("unchecked")
2929
@Singleton
3030
@Replaces(named = QueueFactoryInterface.EXECUTION_NAMED)

0 commit comments

Comments
 (0)