Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -212,31 +214,32 @@ protected Configuration getConfiguration() {
@SuppressWarnings("unchecked")
@Override
public Flux<T> fetch() {
return getConnection()
.flatMapMany(
conn -> {
var expr = (Expression<T>) queryMixin.getMetadata().getProjection();
var serializer = serialize(false);
var mapper = createMapper(expr);

var constants = serializer.getConstants();
var originalSql = serializer.toString();
var sql =
R2dbcUtils.replaceBindingArguments(
configuration.getBindMarkerFactory().create(), constants, originalSql);

var statement = conn.createStatement(sql);
BindTarget bindTarget = new StatementWrapper(statement);

setParameters(
bindTarget,
configuration.getBindMarkerFactory().create(),
constants,
serializer.getConstantPaths(),
getMetadata().getParams());

return Flux.from(statement.execute()).flatMap(result -> result.map(mapper::map));
});
Function<Connection, Publisher<T>> work =
connection -> {
var expr = (Expression<T>) queryMixin.getMetadata().getProjection();
var serializer = serialize(false);
var mapper = createMapper(expr);

var constants = serializer.getConstants();
var originalSql = serializer.toString();
var sql =
R2dbcUtils.replaceBindingArguments(
configuration.getBindMarkerFactory().create(), constants, originalSql);

var statement = connection.createStatement(sql);
BindTarget bindTarget = new StatementWrapper(statement);

setParameters(
bindTarget,
configuration.getBindMarkerFactory().create(),
constants,
serializer.getConstantPaths(),
getMetadata().getParams());

return Flux.from(statement.execute()).flatMap(result -> result.map(mapper::map));
};

return usingConnectionMany(work);
}

private Mapper<T> createMapper(Expression<T> expr) {
Expand Down Expand Up @@ -322,32 +325,53 @@ private Mono<Long> unsafeCount() {

logQuery(sql, constants);

return getConnection()
.flatMap(
connection -> {
var statement = getStatement(connection, sql);
BindTarget bindTarget = new StatementWrapper(statement);

setParameters(
bindTarget,
configuration.getBindMarkerFactory().create(),
constants,
serializer.getConstantPaths(),
getMetadata().getParams());

return Mono.from(statement.execute())
.flatMap(result -> Mono.from(result.map((row, rowMetadata) -> row.get(0))))
.map(
o -> {
if (Integer.class.isAssignableFrom(o.getClass())) {
return ((Integer) o).longValue();
}

return (Long) o;
})
.defaultIfEmpty(0L)
.doOnError(e -> Mono.error(configuration.translate(sql, constants, e)));
});
Function<Connection, Mono<Long>> work =
connection -> {
var statement = getStatement(connection, sql);
BindTarget bindTarget = new StatementWrapper(statement);

setParameters(
bindTarget,
configuration.getBindMarkerFactory().create(),
constants,
serializer.getConstantPaths(),
getMetadata().getParams());

return Mono.from(statement.execute())
.flatMap(result -> Mono.from(result.map((row, rowMetadata) -> row.get(0))))
.map(
o -> {
if (Integer.class.isAssignableFrom(o.getClass())) {
return ((Integer) o).longValue();
}

return (Long) o;
})
.defaultIfEmpty(0L)
.doOnError(e -> Mono.error(configuration.translate(sql, constants, e)));
};

return usingConnection(work);
}

private <R> Flux<R> usingConnectionMany(Function<Connection, Publisher<R>> callback) {
if (connProvider != null) {
return connProvider.withConnectionMany(callback);
}
if (conn != null) {
return Flux.defer(() -> Flux.from(callback.apply(conn)));
}
return Flux.error(new IllegalStateException("No connection provided"));
}

private <R> Mono<R> usingConnection(Function<Connection, Mono<R>> callback) {
if (connProvider != null) {
return connProvider.withConnection(callback);
}
if (conn != null) {
return Mono.defer(() -> callback.apply(conn));
}
return Mono.error(new IllegalStateException("No connection provided"));
}

protected void logQuery(String queryString, Collection<Object> parameters) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package com.querydsl.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/** R2DBC connection provider */
Expand All @@ -13,4 +18,73 @@ public interface R2DBCConnectionProvider {
* @return the connection of the current transaction
*/
Mono<Connection> getConnection();

/**
* Release the connection returned from {@link #getConnection()} once the consumer has finished
* using it. Default implementation is a no-op which is suitable when the provider exposes
* externally managed connections (for example, a transaction scoped connection).
*
* @param connection connection to release
* @return completion signal for the release
*/
default Mono<Void> release(Connection connection) {
return Mono.empty();
}

/**
* Execute the given callback with a managed connection and release it afterwards.
*
* @param callback work to perform with the managed connection
* @param <T> result type
* @return mono emitting the callback result
*/
default <T> Mono<T> withConnection(Function<Connection, Mono<T>> callback) {
Objects.requireNonNull(callback, "callback");
return Mono.usingWhen(
getConnection(),
connection -> Mono.defer(() -> callback.apply(connection)),
this::release,
(connection, error) -> release(connection),
connection -> release(connection));
}

/**
* Execute the given callback that returns a {@link Publisher} sequence with a managed connection
* and release the connection afterwards.
*
* @param callback work to perform with the managed connection
* @param <T> element type emitted by the publisher
* @return flux emitting the callback results
*/
default <T> Flux<T> withConnectionMany(Function<Connection, Publisher<T>> callback) {
Objects.requireNonNull(callback, "callback");
return Flux.usingWhen(
getConnection(),
connection -> Flux.from(callback.apply(connection)),
this::release,
(connection, error) -> release(connection),
connection -> release(connection));
}

/**
* Create a {@link R2DBCConnectionProvider} backed by a {@link ConnectionFactory}. Each invocation
* creates a new connection from the factory and ensures it is closed after use.
*
* @param connectionFactory source of connections
* @return provider that creates and closes connections per use
*/
static R2DBCConnectionProvider from(ConnectionFactory connectionFactory) {
Objects.requireNonNull(connectionFactory, "connectionFactory");
return new R2DBCConnectionProvider() {
@Override
public Mono<Connection> getConnection() {
return Mono.from(connectionFactory.create());
}

@Override
public Mono<Void> release(Connection connection) {
return Mono.from(connection.close());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final class Connections {
private static boolean sqlServerInited, h2Inited, mysqlInited, postgresqlInited;

public static R2DBCConnectionProvider getR2DBCConnectionProvider(String url) {
return () -> Mono.from(getConnectionProvider(url).create());
return R2DBCConnectionProvider.from(getConnectionProvider(url));
}

public static ConnectionFactory getConnectionProvider(String url) {
Expand Down