Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.kestra.jdbc.runner;
package io.kestra.core.runners;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
Expand All @@ -9,12 +9,11 @@
import io.kestra.plugin.core.flow.RetryCaseTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.concurrent.TimeoutException;

@KestraTest(startRunner = true)
public abstract class JdbcRunnerRetryTest {
public abstract class AbstractRunnerRetryTest {

@Inject
private RetryCaseTest retryCaseTest;
Expand Down
71 changes: 66 additions & 5 deletions core/src/test/java/io/kestra/core/runners/AbstractRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,24 @@
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;

import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
//@org.junit.jupiter.api.parallel.Execution(org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT)
// must be per-class to allow calling once init() which took a lot of time
public abstract class AbstractRunnerTest {

public static final String NAMESPACE = "io.kestra.tests";
public static final String TENANT_1 = "tenant1";
public static final String TENANT_2 = "tenant2";
@Inject
Expand Down Expand Up @@ -90,6 +89,12 @@ public abstract class AbstractRunnerTest {
@Inject
private TaskOutputService taskOutputService;

@Inject
protected DispatchQueueInterface<Execution> executionQueue;

@Inject
protected DispatchQueueInterface<ExecutionEvent> executionEventQueue;

@Test
@ExecuteFlow("flows/valids/full.yaml")
void full(Execution execution) throws Exception {
Expand Down Expand Up @@ -598,4 +603,60 @@ protected void workerTaskResultTooLarge() throws Exception {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
}

@Test
void avoidInfiniteExecutionLoop() throws QueueException, InterruptedException {
CopyOnWriteArrayList<ExecutionEvent > executions = new CopyOnWriteArrayList<>();
executionEventQueue.addListener(e -> executions.add(e));

Execution execution = Execution.newExecution(TestsUtils.mockFlow(), Collections.emptyList());
executionQueue.emit(execution);

// We expect the initial execution message + the failed due to missing flow
await()
.during(Duration.ofMillis(500)) // Wait some time to ensure no infinite loop occurs
.atMost(Duration.ofSeconds(10))
.until(() -> executions.size() == 2);
}

@Test
@LoadFlows(value = {"flows/valids/waitfor-child-task-warning.yaml"}, tenantId = "waitforchildtaskwarning")
void waitForChildTaskWarning() throws Exception {
loopUntilTestCaseTest.waitForChildTaskWarning("waitforchildtaskwarning");
}


@Test
@LoadFlows("flows/valids/errors.yaml")
void errors() throws Exception {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
logsQueue.addListener(l -> logs.add(l));

Execution execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "errors", null, null,
Duration.ofSeconds(60));

assertThat(execution.getTaskRunList()).hasSize(7);

LogEntry logEntry = TestsUtils.awaitLog(logs,
log -> log.getMessage().contains("- task: failed, message: Task failure"));
assertThat(logEntry).isNotNull();
assertThat(logEntry.getMessage()).isEqualTo("- task: failed, message: Task failure");
}

@RetryingTest(5)
@LoadFlows({"flows/valids/execution.yaml"})
void executionDate() throws Exception {
Execution execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE,
"execution-start-date", null, null, Duration.ofSeconds(60));

Map<String, Object> outputs = taskOutputService.getOutputs(execution.getTaskRunList().getFirst());
assertThat((String) outputs.get("value")).matches("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z");
}

@RetryingTest(5)
@LoadFlows(value = {"flows/valids/for-each-item-subflow-sleep.yaml",
"flows/valids/for-each-item-no-wait.yaml"}, tenantId = "foreachitemnowait")
protected void forEachItemNoWait() throws Exception {
forEachItemCaseTest.forEachItemNoWait("foreachitemnowait");
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
import io.kestra.core.runners.AbstractRunnerRetryTest;

public class H2RunnerRetryTest extends JdbcRunnerRetryTest {
public class H2RunnerRetryTest extends AbstractRunnerRetryTest {

}
4 changes: 2 additions & 2 deletions jdbc-h2/src/test/java/io/kestra/runner/h2/H2RunnerTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.runner.JdbcRunnerTest;
import io.kestra.core.runners.AbstractRunnerTest;

public class H2RunnerTest extends JdbcRunnerTest {
public class H2RunnerTest extends AbstractRunnerTest {
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
import io.kestra.core.runners.AbstractRunnerRetryTest;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

@TestInstance(Lifecycle.PER_CLASS)
public class MysqlRunnerRetryTest extends JdbcRunnerRetryTest {
public class MysqlRunnerRetryTest extends AbstractRunnerRetryTest {

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.runner.JdbcRunnerTest;
import io.kestra.core.runners.AbstractRunnerTest;

public class MysqlRunnerTest extends JdbcRunnerTest {
public class MysqlRunnerTest extends AbstractRunnerTest {

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.runner.postgres;

import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
import io.kestra.core.runners.AbstractRunnerRetryTest;

public class PostgresRunnerRetryTest extends JdbcRunnerRetryTest {
public class PostgresRunnerRetryTest extends AbstractRunnerRetryTest {

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.runner.postgres;

import io.kestra.jdbc.runner.JdbcRunnerTest;
import io.kestra.core.runners.AbstractRunnerTest;

public class PostgresRunnerTest extends JdbcRunnerTest {
public class PostgresRunnerTest extends AbstractRunnerTest {

}
92 changes: 0 additions & 92 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java

This file was deleted.

Loading