Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/src/main/java/io/kestra/core/queues/QueueInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import org.apache.commons.lang3.NotImplementedException;

import java.io.Closeable;
import java.util.List;
Expand Down Expand Up @@ -68,4 +69,20 @@ default Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Eith
}

Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);

default void deleteByKey(String key) throws QueueException {
throw new NotImplementedException();
}

default void deleteByKeys(List<String> keys) throws QueueException {
throw new NotImplementedException();
}

default void emitOnly(String consumerGroup, T message) throws QueueException {
throw new NotImplementedException();
}

default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
throw new NotImplementedException();
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/io/kestra/core/runners/FlowListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -30,8 +31,8 @@ public class FlowListeners implements FlowListenersInterface {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final QueueInterface<FlowInterface> flowQueue;
private final List<FlowWithSource> flows;
private final List<Consumer<List<FlowWithSource>>> consumers = new ArrayList<>();
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new ArrayList<>();
private final List<Consumer<List<FlowWithSource>>> consumers = new CopyOnWriteArrayList<>();
private final List<BiConsumer<FlowWithSource, FlowWithSource>> consumersEach = new CopyOnWriteArrayList<>();

private final PluginDefaultService pluginDefaultService;

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/io/kestra/core/utils/ListUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;

public class ListUtils {
public static <T> List<T> emptyOnNull(List<T> list) {
Expand Down Expand Up @@ -78,4 +82,13 @@ public static <T> List<List<T>> partition(List<T> list, int size) {
}
return parts;
}

private static <T> Predicate<T> distinctByKeyPredicate(Function<? super T,Object> keyExtractor) {
Map<Object,Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}

public static <T> List<T> distinctByKey(List<T> withDuplicates, Function<? super T, Object> keyExtractor) {
return withDuplicates.stream().filter(distinctByKeyPredicate(keyExtractor)).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
import java.util.Map;

import jakarta.inject.Inject;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

import static org.assertj.core.api.Assertions.assertThat;

@KestraTest
@TestInstance(Lifecycle.PER_CLASS)
class DateFilterTest {
public static final ZonedDateTime NOW = ZonedDateTime.parse("2013-09-08T16:19:12.123456+01");

Expand Down
21 changes: 20 additions & 1 deletion core/src/test/java/io/kestra/core/utils/ListUtilsTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package io.kestra.core.utils;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -52,4 +56,19 @@ void convertToListString(){

assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
}
}

@Test
void distinctByKey() {
List<Execution> executions = List.of(
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.CREATED, Collections.emptyList())).build(),
Execution.builder().id("1").flowId("flow1").state(State.of(State.Type.RUNNING, List.of(new State.History(State.Type.CREATED, Instant.now().minus(Duration.ofSeconds(2)))))).build(),
Execution.builder().id("2").flowId("flow2").build()
);

List<Execution> distinctExecutions = ListUtils.distinctByKey(executions, Execution::getId);

assertThat(distinctExecutions.size()).isEqualTo(2);
assertThat(distinctExecutions.stream().map(Execution::getId)).containsExactlyInAnyOrder("1", "2");
assertThat(distinctExecutions.stream().filter(e -> e.getId().equals("1")).findFirst().get().getState().getCurrent()).isEqualTo(State.Type.CREATED);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

@TestInstance(Lifecycle.PER_CLASS)
public class MysqlRunnerRetryTest extends JdbcRunnerRetryTest {

}
10 changes: 5 additions & 5 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void run() {

Await.until(() -> this.allFlows != null, Duration.ofMillis(100), Duration.ofMinutes(5));

this.receiveCancellations.addFirst(((JdbcQueue<Execution>) this.executionQueue).receiveBatch(
this.receiveCancellations.addFirst(this.executionQueue.receiveBatch(
Executor.class,
executions -> {
// process execution message grouped by executionId to avoid concurrency as the execution level as it would
Expand All @@ -318,7 +318,7 @@ public void run() {
CompletableFuture.allOf(perExecutionFutures.toArray(CompletableFuture[]::new)).join();
}
));
this.receiveCancellations.addFirst(((JdbcQueue<WorkerTaskResult>) this.workerTaskResultQueue).receiveBatch(
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receiveBatch(
Executor.class,
workerTaskResults -> {
List<CompletableFuture<Void>> futures = workerTaskResults.stream()
Expand Down Expand Up @@ -1181,7 +1181,7 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
// IMPORTANT: this must be done before emitting the last execution message so that all consumers are notified that the execution ends.
// NOTE: we may also purge ExecutionKilled events, but as there may not be a lot of them, it may not be worth it.
if (cleanExecutionQueue && isTerminated) {
((JdbcQueue<Execution>) executionQueue).deleteByKey(executor.getExecution().getId());
executionQueue.deleteByKey(executor.getExecution().getId());
}

// emit for other consumers than the executor if no failure
Expand Down Expand Up @@ -1270,8 +1270,8 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
List<String> taskRunKeys = executor.getExecution().getTaskRunList().stream()
.map(taskRun -> taskRun.getId())
.toList();
((JdbcQueue<WorkerTaskResult>) workerTaskResultQueue).deleteByKeys(taskRunKeys);
((JdbcQueue<WorkerJob>) workerJobQueue).deleteByKeys(taskRunKeys);
workerTaskResultQueue.deleteByKeys(taskRunKeys);
workerJobQueue.deleteByKeys(taskRunKeys);
}
}
} catch (QueueException | FlowNotFoundException e) {
Expand Down
10 changes: 5 additions & 5 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
@JdbcRunnerEnabled
public class JdbcIndexer implements Indexer {
private final LogRepositoryInterface logRepository;
private final JdbcQueue<LogEntry> logQueue;
private final QueueInterface<LogEntry> logQueue;

private final MetricRepositoryInterface metricRepository;
private final JdbcQueue<MetricEntry> metricQueue;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();

Expand All @@ -67,9 +67,9 @@ public JdbcIndexer(
QueueService queueService
) {
this.logRepository = logRepository;
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
this.ignoreExecutionService = ignoreExecutionService;
Expand All @@ -91,7 +91,7 @@ protected void startQueues() {
this.sendBatch(metricQueue, metricRepository);
}

protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
// first, log all deserialization issues
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.Map;
import java.util.Optional;

@KestraTest(rebuildContext = true)
@KestraTest
abstract public class AbstractSchedulerTest {
public final static String TENANT_ID = "main";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;

@TestInstance(Lifecycle.PER_CLASS)
public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected FlowListeners flowListenersService;
Expand Down Expand Up @@ -741,4 +740,4 @@ void recoverNONELongRunningExecution() throws Exception {
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
package io.kestra.core.junit.extensions;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledExecution;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.TestQueueFactory;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.TestRunner;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.test.annotation.MicronautTestValue;
import io.micronaut.test.context.TestContext;
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.platform.commons.support.AnnotationSupport;

import java.util.Set;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeoutException;

@Slf4j
public class KestraTestExtension extends MicronautJunit5Extension {
@Override
protected MicronautTestValue buildMicronautTestValue(Class<?> testClass) {
Expand Down Expand Up @@ -47,14 +63,18 @@ protected boolean hasExpectedAnnotations(Class<?> testClass) {
}

@Override
public void beforeAll(ExtensionContext extensionContext) throws Exception {
super.beforeAll(extensionContext);
KestraTest kestraTest = extensionContext.getTestClass()
public void beforeTestExecution(ExtensionContext context) throws Exception {
super.beforeTestExecution(context);

TestQueueFactory.testExecutions.set(new ArrayList<>());

KestraTest kestraTest = context.getTestClass()
.orElseThrow()
.getAnnotation(KestraTest.class);
if (kestraTest.startRunner()){

if (kestraTest.startRunner()) {
TestRunner runner = applicationContext.getBean(TestRunner.class);
if (!runner.isRunning()){
if (!runner.isRunning()) {
runner.setSchedulerEnabled(kestraTest.startScheduler());
runner.setWorkerEnabled(kestraTest.startWorker());
runner.run();
Expand All @@ -67,5 +87,82 @@ public void afterTestExecution(ExtensionContext context) throws Exception {
super.afterTestExecution(context);

TestsUtils.queueConsumersCleanup();

List<Execution> executionsToKill = TestQueueFactory.testExecutions.get();
if (!executionsToKill.isEmpty()
&& applicationContext.containsBean(ExecutionRepositoryInterface.class)
&& applicationContext.containsBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED))) {
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
QueueInterface<ExecutionKilled> killQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));

KestraTest kestraTest = context.getTestClass()
.orElseThrow()
.getAnnotation(KestraTest.class);
// We only wait for KILLED state if the runner is started, otherwise we just emit the kill event and it may be processed upon starting a test with a runner
List<Execution> killedExecutions = retryingExecutionKill(executionsToKill, executionRepository, killQueue, 10, kestraTest.startRunner());

executionsToKill.removeIf(execution -> killedExecutions.stream().anyMatch(killedExecution ->
Objects.equals(execution.getTenantId(), killedExecution.getTenantId())
&& Objects.equals(execution.getId(), killedExecution.getId())
));
}
}


private List<Execution> retryingExecutionKill(List<Execution> testExecutions, ExecutionRepositoryInterface executionRepository, QueueInterface<ExecutionKilled> killQueue, int retriesLeft, boolean shouldWaitForKill) throws InterruptedException {
try {
List<Execution> runningExecutions = ListUtils.distinctByKey(
testExecutions.stream().flatMap(launchedExecution -> executionRepository.findById(launchedExecution.getTenantId(), launchedExecution.getId()).stream()).toList(),
Execution::getId
).stream().filter(inRepository -> !inRepository.getState().isTerminated()).toList();

runningExecutions.forEach(inRepository -> emitKillMessage(killQueue, inRepository));

if (shouldWaitForKill) {
try {
waitForKilled(executionRepository, runningExecutions);
} catch (TimeoutException e) {
log.warn("Some executions remained in KILLING", e);
}
}
return runningExecutions;
} catch (ConcurrentModificationException e) {
// We intentionally don't use a CopyOnWriteArrayList to retry on concurrent modification exceptions to make sure to get rid of flakiness due to overflowing executions
if (retriesLeft <= 0) {
log.warn("Couldn't kill executions after test execution, due to concurrent modifications, this could impact further tests", e);
return Collections.emptyList();
}
Thread.sleep(100);
return retryingExecutionKill(testExecutions, executionRepository, killQueue, retriesLeft - 1, shouldWaitForKill);
}
}

private void emitKillMessage(QueueInterface<ExecutionKilled> killQueue, Execution inRepository) {
log.warn("Execution {} is still running after test execution, killing it", inRepository.getId());
try {
killQueue.emit(ExecutionKilledExecution.builder()
.tenantId(inRepository.getTenantId())
.executionId(inRepository.getId())
.state(ExecutionKilled.State.REQUESTED)
.isOnKillCascade(true)
.build()
);
} catch (QueueException e) {
log.warn("Couldn't kill execution {} after test execution", inRepository.getId(), e);
}
}

private void waitForKilled(ExecutionRepositoryInterface executionRepository, List<Execution> runningExecutions) throws TimeoutException {
Await.until(() -> runningExecutions.stream()
.map(execution -> executionRepository.findById(execution.getTenantId(), execution.getId()))
.allMatch(maybeExecution -> maybeExecution.map(inRepository -> {
boolean terminated = inRepository.getState().isTerminated();
if (!terminated) {
log.warn("Execution {} has a pending KILL request but is still in state {} ", inRepository.getId(), inRepository.getState().getCurrent());
}
return terminated;
})
.orElse(true))
, Duration.ofMillis(50), Duration.ofSeconds(10));
}
}
Loading
Loading