Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions scheduler/src/test/java/io/kestra/scheduler/Fixtures.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
import java.util.function.Function;

public interface Fixtures {

String TEST_TENANT = "tenant";
String TEST_NAMESPACE = "io.kestra.unittest";
String TEST_FLOW_ID = "test";
String TEST_TRIGGER_ID = "trigger";

static TriggerId triggerId() {
return triggerId(TEST_TRIGGER_ID);
}

static TriggerId triggerId(String triggerId) {
return TriggerId.of(TEST_TENANT, TEST_NAMESPACE, TEST_FLOW_ID, triggerId);
}

static FlowWithSource defaultFlow() {
return defaultFlow(Schedule.ScheduleBuilder::build);
}

static FlowWithSource defaultFlow(Function<Schedule.ScheduleBuilder<?, ?>, Schedule> builder) {
return flowWithSchedulePT15M("Europe/Paris", builder);
}

static FlowWithSource flowWithTrigger(AbstractTrigger trigger) {
return FlowWithSource.builder()
.tenantId(TEST_TENANT)
Expand All @@ -45,34 +45,45 @@ static FlowWithSource flowWithTrigger(AbstractTrigger trigger) {
.triggers(List.of(trigger))
.build();
}


static FlowWithSource flowWithTrigger(AbstractTrigger trigger, String flowId) {
return FlowWithSource.builder()
.tenantId(TEST_TENANT)
.id(flowId)
.revision(0)
.namespace(TEST_NAMESPACE)
.tasks(List.of(Return.builder().id("return").type(Return.class.getName()).build()))
.triggers(List.of(trigger))
.build();
}

static FlowWithSource flowWithSchedulePT15M(String timeZone) {
return flowWithSchedulePT15M(timeZone, Schedule.ScheduleBuilder::build);
}

static FlowWithSource flowWithScheduleOnDate(String timeZone, List<ZonedDateTime> dates) {
return flowWithScheduleOnDate(timeZone, dates, ScheduleOnDates.ScheduleOnDatesBuilder::build);
}

static FlowWithSource flowWithScheduleOnDate(String timeZone, List<ZonedDateTime> dates, Function<ScheduleOnDates.ScheduleOnDatesBuilder<?, ?>, ScheduleOnDates> builder) {

ScheduleOnDates.ScheduleOnDatesBuilder<?, ?> schedule = ScheduleOnDates.builder()
.id(TEST_TRIGGER_ID)
.type(ScheduleOnDates.class.getName())
.dates(Property.ofValue(dates))
.timezone(timeZone);

return flowWithTrigger(builder.apply(schedule));
}

static FlowWithSource flowWithSchedulePT15M(String timeZone, Function<Schedule.ScheduleBuilder<?, ?>, Schedule> builder) {

Schedule.ScheduleBuilder<?, ?> schedule = Schedule.builder()
.id(TEST_TRIGGER_ID)
.type(Schedule.class.getName())
.cron("*/15 * * * *")
.timezone(timeZone);

return flowWithTrigger(builder.apply(schedule));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

boolean startWorker() default true;

boolean startWorkerController() default true;

Class<?> application() default void.class;

String[] environments() default {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
if (!runner.isRunning()){
runner.setSchedulerEnabled(kestraTest.startScheduler());
runner.setWorkerEnabled(kestraTest.startWorker());
runner.setWorkerControllerEnabled(kestraTest.startWorkerController());
runner.run();
}
}
Expand Down
7 changes: 5 additions & 2 deletions tests/src/main/java/io/kestra/core/runners/TestRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class TestRunner implements Runnable, AutoCloseable {
@Setter private int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors()) * 4;
@Setter private boolean schedulerEnabled = true;
@Setter private boolean workerEnabled = true;
@Setter private boolean workerControllerEnabled = true;

@Inject
private ExecutorsUtils executorsUtils;
Expand All @@ -52,11 +53,13 @@ public void run() {
servers.add(executor);
poolExecutor.execute(executor);

if (workerEnabled) {
if (workerControllerEnabled) {
Controller controller = applicationContext.getBean(Controller.class);
poolExecutor.execute(controller::start);
servers.add(controller);

}

if (workerEnabled) {
Worker worker = applicationContext.getBean(Worker.class);
poolExecutor.execute(() -> worker.start(workerThread, null));
servers.add(worker);
Expand Down
Loading