Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,20 @@ if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_17)) {
[name: '9.9.0', flyway: '9.9.0', 'flyway-test': '9.5.0', spring: '6.0.14', 'spring-boot': '3.0.13', 'zonky-postgres': 'default'],
[name: '9.16.3', flyway: '9.16.3', 'flyway-test': '9.5.0', spring: '6.0.21', 'spring-boot': '3.1.12', 'zonky-postgres': 'default'],
[name: '9.22.3', flyway: '9.22.3', 'flyway-test': '9.5.0', spring: '6.1.15', 'spring-boot': '3.2.12', 'zonky-postgres': 'default'],
[name: '10.10.0', flyway: '10.10.0', 'flyway-test': '10.0.0', spring: '6.1.15', 'spring-boot': '3.3.6', 'zonky-postgres': 'default'],
[name: '10.20.1', flyway: '10.20.1', 'flyway-test': '10.0.0', spring: '6.2.0', 'spring-boot': '3.4.0', 'zonky-postgres': 'default'],
[name: '11.0.0', flyway: '11.0.0', 'flyway-test': '10.0.0', spring: '6.2.0', 'spring-boot': '3.4.0', 'zonky-postgres': 'default']
[name: '10.10.0', flyway: '10.10.0', 'flyway-test': '10.0.0', spring: '6.1.15', 'spring-boot': '3.3.6', 'zonky-postgres': 'default'],
[name: '10.20.1', flyway: '10.20.1', 'flyway-test': '10.0.0', spring: '6.2.12', 'spring-boot': '3.4.11', 'zonky-postgres': 'default'],
[name: '11.7.2', flyway: '11.7.2', 'flyway-test': '10.0.0', spring: '6.2.12', 'spring-boot': '3.5.7', 'zonky-postgres': 'default'],
[name: '11.14.1', flyway: '11.14.1', 'flyway-test': '10.0.0', spring: '7.0.0-RC3', 'spring-boot': '4.0.0-RC2', 'zonky-postgres': 'default']
]

testSuites.find { it.name == 'liquibase' }.versions += [
[name: '4.17.2', liquibase: '4.17.2', spring: '6.0.14', 'spring-boot': '3.0.13'],
[name: '4.20.0', liquibase: '4.20.0', spring: '6.0.21', 'spring-boot': '3.1.12'],
[name: '4.24.0', liquibase: '4.24.0', spring: '6.1.15', 'spring-boot': '3.2.12'],
[name: '4.27.0', liquibase: '4.27.0', spring: '6.1.15', 'spring-boot': '3.3.6'],
[name: '4.29.2', liquibase: '4.29.2', spring: '6.2.0', 'spring-boot': '3.4.0']
[name: '4.29.2', liquibase: '4.29.2', spring: '6.2.12', 'spring-boot': '3.4.11'],
[name: '4.31.1', liquibase: '4.31.1', spring: '6.2.12', 'spring-boot': '3.5.7'],
[name: '5.0.1', liquibase: '5.0.1', spring: '7.0.0-RC3', 'spring-boot': '4.0.0-RC2']
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.test.context.transaction.TestTransaction;
import org.springframework.util.concurrent.SettableListenableFuture;

import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -283,8 +283,8 @@ private EmbeddedDatabase awaitDatabase() {
}

private Future<EmbeddedDatabase> databaseFuture(Object database) {
SettableListenableFuture<EmbeddedDatabase> future = new SettableListenableFuture<>();
future.set((EmbeddedDatabase) database);
CompletableFuture<EmbeddedDatabase> future = new CompletableFuture<>();
future.complete((EmbeddedDatabase) database);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import io.zonky.test.db.preparer.DatabasePreparer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

import javax.sql.DataSource;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public abstract class FlywayDatabasePreparer implements DatabasePreparer {

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final SettableListenableFuture<Object> result = new SettableListenableFuture<>();
protected final CompletableFuture<Object> result = new CompletableFuture<>();
protected final FlywayDescriptor descriptor;

public FlywayDatabasePreparer(FlywayDescriptor descriptor) {
Expand All @@ -44,7 +43,7 @@ public FlywayDescriptor getDescriptor() {
return descriptor;
}

public ListenableFuture<Object> getResult() {
public CompletableFuture<Object> getResult() {
return result;
}

Expand All @@ -59,10 +58,10 @@ public void prepare(DataSource dataSource) {
wrapper.setDataSource(dataSource);

try {
result.set(doOperation(wrapper));
result.complete(doOperation(wrapper));
logger.trace("Database has been successfully prepared in {}", stopwatch);
} catch (RuntimeException e) {
result.setException(e);
result.completeExceptionally(e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;

import java.util.Comparator;
import java.util.List;
Expand All @@ -43,6 +41,8 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -167,15 +167,15 @@ protected PrefetchingTask prepareNewDatabase(PipelineKey key, int priority) {
databaseCount.decrementAndGet();

if (databaseToRemove.getKey().equals(key)) {
return executeTask(key, PrefetchingTask.withDatabase(databaseToRemove.getValue(), priority));
return executeTask(PrefetchingTask.withDatabase(key, databaseToRemove.getValue(), priority));
} else {
databaseToRemove.getValue().close();
DatabasePipeline pipeline = pipelines.get(databaseToRemove.getKey());
logger.trace("Prepared database has been cleaned: {}", pipeline.key);
}
}

return executeTask(key, PrefetchingTask.forPreparer(key.provider, key.preparer, priority));
return executeTask(PrefetchingTask.forPreparer(key, key.provider, key.preparer, priority));
}

protected Optional<PrefetchingTask> prepareExistingDatabase(PipelineKey key, int priority) {
Expand All @@ -197,7 +197,7 @@ protected Optional<PrefetchingTask> prepareExistingDatabase(PipelineKey key, int
if (result != null) {
CompositeDatabasePreparer complementaryPreparer = new CompositeDatabasePreparer(preparers.subList(i, preparers.size()));
logger.trace("Preparing existing database from {} pipeline by using the complementary preparer {}", existingPipeline.key, complementaryPreparer);
PrefetchingTask task = executeTask(key, PrefetchingTask.withDatabase(result.get(), complementaryPreparer, priority));
PrefetchingTask task = executeTask(PrefetchingTask.withDatabase(key, result.get(), complementaryPreparer, priority));

prepareDatabase(pipelineKey, LOWEST_PRECEDENCE);
reschedulePipeline(pipelineKey);
Expand All @@ -223,33 +223,13 @@ protected void reschedulePipeline(PipelineKey key) {

for (int i = 0; i < cancelledTasks.size(); i++) {
int priority = -1 * (int) (invocationCount / cancelledTasks.size() * (i + 1));
executeTask(key, PrefetchingTask.fromTask(cancelledTasks.get(i), priority));
executeTask(PrefetchingTask.fromTask(key, cancelledTasks.get(i), priority));
}
}
}

protected PrefetchingTask executeTask(PipelineKey key, PrefetchingTask task) {
DatabasePipeline pipeline = pipelines.get(key);

task.addCallback(new ListenableFutureCallback<EmbeddedDatabase>() {
@Override
public void onSuccess(EmbeddedDatabase result) {
if (task.type == NEW_DATABASE) {
pipeline.state.set(INITIALIZED);
}
pipeline.tasks.remove(task);
pipeline.results.offer(PreparedResult.success(result));
}

@Override
public void onFailure(Throwable error) {
pipeline.tasks.remove(task);
if (!(error instanceof CancellationException)) {
pipeline.results.offer(PreparedResult.failure(error));
}
}
});

protected PrefetchingTask executeTask(PrefetchingTask task) {
DatabasePipeline pipeline = pipelines.get(task.key);
pipeline.tasks.add(task);
taskExecutor.execute(task);
return task;
Expand Down Expand Up @@ -379,36 +359,38 @@ protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
}
}

protected static class PrefetchingTask extends ListenableFutureTask<EmbeddedDatabase> implements Comparable<PrefetchingTask> {
protected static class PrefetchingTask extends FutureTask<EmbeddedDatabase> implements Comparable<PrefetchingTask> {

private final AtomicBoolean executed = new AtomicBoolean(false);

public final PipelineKey key;
public final Callable<EmbeddedDatabase> action;
public final TaskType type;
public final int priority;

public static PrefetchingTask forPreparer(DatabaseProvider provider, DatabasePreparer preparer, int priority) {
return new PrefetchingTask(priority, NEW_DATABASE, () -> provider.createDatabase(preparer));
public static PrefetchingTask forPreparer(PipelineKey key, DatabaseProvider provider, DatabasePreparer preparer, int priority) {
return new PrefetchingTask(key, priority, NEW_DATABASE, () -> provider.createDatabase(preparer));
}

public static PrefetchingTask withDatabase(EmbeddedDatabase database, DatabasePreparer preparer, int priority) {
return new PrefetchingTask(priority, EXISTING_DATABASE, () -> {
public static PrefetchingTask withDatabase(PipelineKey key, EmbeddedDatabase database, DatabasePreparer preparer, int priority) {
return new PrefetchingTask(key, priority, EXISTING_DATABASE, () -> {
preparer.prepare(database);
return database;
});
}

public static PrefetchingTask withDatabase(EmbeddedDatabase database, int priority) {
return new PrefetchingTask(priority, EXISTING_DATABASE, () -> database);
public static PrefetchingTask withDatabase(PipelineKey key, EmbeddedDatabase database, int priority) {
return new PrefetchingTask(key, priority, EXISTING_DATABASE, () -> database);
}

public static PrefetchingTask fromTask(PrefetchingTask task, int priority) {
return new PrefetchingTask(priority, task.type, task.action);
public static PrefetchingTask fromTask(PipelineKey key, PrefetchingTask task, int priority) {
return new PrefetchingTask(key, priority, task.type, task.action);
}

private PrefetchingTask(int priority, TaskType type, Callable<EmbeddedDatabase> action) {
private PrefetchingTask(PipelineKey key, int priority, TaskType type, Callable<EmbeddedDatabase> action) {
super(action);

this.key = key;
this.action = action;
this.type = type;
this.priority = priority;
Expand All @@ -421,6 +403,37 @@ public void run() {
}
}

@Override
protected void done() {
DatabasePipeline pipeline = pipelines.get(key);
Throwable cause;

try {
EmbeddedDatabase result = get();

if (type == NEW_DATABASE) {
pipeline.state.set(INITIALIZED);
}
pipeline.tasks.remove(this);
pipeline.results.offer(PreparedResult.success(result));
return;
}
catch (ExecutionException ex) {
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable ex) {
cause = ex;
}

pipeline.tasks.remove(this);
if (!(cause instanceof CancellationException)) {
pipeline.results.offer(PreparedResult.failure(cause));
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (mayInterruptIfRunning || executed.compareAndSet(false, true)) {
Expand Down