Skip to content

Commit 5b26d6d

Browse files
authored
feat: add managed connection helpers to provider (#1387)
- add overridable release hook for connection cleanup - expose withConnection/withConnectionMany wrappers using usingWhen - provide factory method that opens and closes ConnectionFactory connections
1 parent 1330136 commit 5b26d6d

File tree

3 files changed

+150
-52
lines changed

3 files changed

+150
-52
lines changed

querydsl-libraries/querydsl-r2dbc/src/main/java/com/querydsl/r2dbc/AbstractR2DBCQuery.java

Lines changed: 75 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.Objects;
43+
import java.util.function.Function;
4344
import java.util.logging.Level;
4445
import java.util.logging.Logger;
4546
import org.jetbrains.annotations.NotNull;
4647
import org.jetbrains.annotations.Nullable;
48+
import org.reactivestreams.Publisher;
4749
import reactor.core.publisher.Flux;
4850
import reactor.core.publisher.Mono;
4951

@@ -212,31 +214,32 @@ protected Configuration getConfiguration() {
212214
@SuppressWarnings("unchecked")
213215
@Override
214216
public Flux<T> fetch() {
215-
return getConnection()
216-
.flatMapMany(
217-
conn -> {
218-
var expr = (Expression<T>) queryMixin.getMetadata().getProjection();
219-
var serializer = serialize(false);
220-
var mapper = createMapper(expr);
221-
222-
var constants = serializer.getConstants();
223-
var originalSql = serializer.toString();
224-
var sql =
225-
R2dbcUtils.replaceBindingArguments(
226-
configuration.getBindMarkerFactory().create(), constants, originalSql);
227-
228-
var statement = conn.createStatement(sql);
229-
BindTarget bindTarget = new StatementWrapper(statement);
230-
231-
setParameters(
232-
bindTarget,
233-
configuration.getBindMarkerFactory().create(),
234-
constants,
235-
serializer.getConstantPaths(),
236-
getMetadata().getParams());
237-
238-
return Flux.from(statement.execute()).flatMap(result -> result.map(mapper::map));
239-
});
217+
Function<Connection, Publisher<T>> work =
218+
connection -> {
219+
var expr = (Expression<T>) queryMixin.getMetadata().getProjection();
220+
var serializer = serialize(false);
221+
var mapper = createMapper(expr);
222+
223+
var constants = serializer.getConstants();
224+
var originalSql = serializer.toString();
225+
var sql =
226+
R2dbcUtils.replaceBindingArguments(
227+
configuration.getBindMarkerFactory().create(), constants, originalSql);
228+
229+
var statement = connection.createStatement(sql);
230+
BindTarget bindTarget = new StatementWrapper(statement);
231+
232+
setParameters(
233+
bindTarget,
234+
configuration.getBindMarkerFactory().create(),
235+
constants,
236+
serializer.getConstantPaths(),
237+
getMetadata().getParams());
238+
239+
return Flux.from(statement.execute()).flatMap(result -> result.map(mapper::map));
240+
};
241+
242+
return usingConnectionMany(work);
240243
}
241244

242245
private Mapper<T> createMapper(Expression<T> expr) {
@@ -322,32 +325,53 @@ private Mono<Long> unsafeCount() {
322325

323326
logQuery(sql, constants);
324327

325-
return getConnection()
326-
.flatMap(
327-
connection -> {
328-
var statement = getStatement(connection, sql);
329-
BindTarget bindTarget = new StatementWrapper(statement);
330-
331-
setParameters(
332-
bindTarget,
333-
configuration.getBindMarkerFactory().create(),
334-
constants,
335-
serializer.getConstantPaths(),
336-
getMetadata().getParams());
337-
338-
return Mono.from(statement.execute())
339-
.flatMap(result -> Mono.from(result.map((row, rowMetadata) -> row.get(0))))
340-
.map(
341-
o -> {
342-
if (Integer.class.isAssignableFrom(o.getClass())) {
343-
return ((Integer) o).longValue();
344-
}
345-
346-
return (Long) o;
347-
})
348-
.defaultIfEmpty(0L)
349-
.doOnError(e -> Mono.error(configuration.translate(sql, constants, e)));
350-
});
328+
Function<Connection, Mono<Long>> work =
329+
connection -> {
330+
var statement = getStatement(connection, sql);
331+
BindTarget bindTarget = new StatementWrapper(statement);
332+
333+
setParameters(
334+
bindTarget,
335+
configuration.getBindMarkerFactory().create(),
336+
constants,
337+
serializer.getConstantPaths(),
338+
getMetadata().getParams());
339+
340+
return Mono.from(statement.execute())
341+
.flatMap(result -> Mono.from(result.map((row, rowMetadata) -> row.get(0))))
342+
.map(
343+
o -> {
344+
if (Integer.class.isAssignableFrom(o.getClass())) {
345+
return ((Integer) o).longValue();
346+
}
347+
348+
return (Long) o;
349+
})
350+
.defaultIfEmpty(0L)
351+
.doOnError(e -> Mono.error(configuration.translate(sql, constants, e)));
352+
};
353+
354+
return usingConnection(work);
355+
}
356+
357+
private <R> Flux<R> usingConnectionMany(Function<Connection, Publisher<R>> callback) {
358+
if (connProvider != null) {
359+
return connProvider.withConnectionMany(callback);
360+
}
361+
if (conn != null) {
362+
return Flux.defer(() -> Flux.from(callback.apply(conn)));
363+
}
364+
return Flux.error(new IllegalStateException("No connection provided"));
365+
}
366+
367+
private <R> Mono<R> usingConnection(Function<Connection, Mono<R>> callback) {
368+
if (connProvider != null) {
369+
return connProvider.withConnection(callback);
370+
}
371+
if (conn != null) {
372+
return Mono.defer(() -> callback.apply(conn));
373+
}
374+
return Mono.error(new IllegalStateException("No connection provided"));
351375
}
352376

353377
protected void logQuery(String queryString, Collection<Object> parameters) {

querydsl-libraries/querydsl-r2dbc/src/main/java/com/querydsl/r2dbc/R2DBCConnectionProvider.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package com.querydsl.r2dbc;
22

33
import io.r2dbc.spi.Connection;
4+
import io.r2dbc.spi.ConnectionFactory;
5+
import java.util.Objects;
6+
import java.util.function.Function;
7+
import org.reactivestreams.Publisher;
8+
import reactor.core.publisher.Flux;
49
import reactor.core.publisher.Mono;
510

611
/** R2DBC connection provider */
@@ -13,4 +18,73 @@ public interface R2DBCConnectionProvider {
1318
* @return the connection of the current transaction
1419
*/
1520
Mono<Connection> getConnection();
21+
22+
/**
23+
* Release the connection returned from {@link #getConnection()} once the consumer has finished
24+
* using it. Default implementation is a no-op which is suitable when the provider exposes
25+
* externally managed connections (for example, a transaction scoped connection).
26+
*
27+
* @param connection connection to release
28+
* @return completion signal for the release
29+
*/
30+
default Mono<Void> release(Connection connection) {
31+
return Mono.empty();
32+
}
33+
34+
/**
35+
* Execute the given callback with a managed connection and release it afterwards.
36+
*
37+
* @param callback work to perform with the managed connection
38+
* @param <T> result type
39+
* @return mono emitting the callback result
40+
*/
41+
default <T> Mono<T> withConnection(Function<Connection, Mono<T>> callback) {
42+
Objects.requireNonNull(callback, "callback");
43+
return Mono.usingWhen(
44+
getConnection(),
45+
connection -> Mono.defer(() -> callback.apply(connection)),
46+
this::release,
47+
(connection, error) -> release(connection),
48+
connection -> release(connection));
49+
}
50+
51+
/**
52+
* Execute the given callback that returns a {@link Publisher} sequence with a managed connection
53+
* and release the connection afterwards.
54+
*
55+
* @param callback work to perform with the managed connection
56+
* @param <T> element type emitted by the publisher
57+
* @return flux emitting the callback results
58+
*/
59+
default <T> Flux<T> withConnectionMany(Function<Connection, Publisher<T>> callback) {
60+
Objects.requireNonNull(callback, "callback");
61+
return Flux.usingWhen(
62+
getConnection(),
63+
connection -> Flux.from(callback.apply(connection)),
64+
this::release,
65+
(connection, error) -> release(connection),
66+
connection -> release(connection));
67+
}
68+
69+
/**
70+
* Create a {@link R2DBCConnectionProvider} backed by a {@link ConnectionFactory}. Each invocation
71+
* creates a new connection from the factory and ensures it is closed after use.
72+
*
73+
* @param connectionFactory source of connections
74+
* @return provider that creates and closes connections per use
75+
*/
76+
static R2DBCConnectionProvider from(ConnectionFactory connectionFactory) {
77+
Objects.requireNonNull(connectionFactory, "connectionFactory");
78+
return new R2DBCConnectionProvider() {
79+
@Override
80+
public Mono<Connection> getConnection() {
81+
return Mono.from(connectionFactory.create());
82+
}
83+
84+
@Override
85+
public Mono<Void> release(Connection connection) {
86+
return Mono.from(connection.close());
87+
}
88+
};
89+
}
1690
}

querydsl-libraries/querydsl-r2dbc/src/test/java/com/querydsl/r2dbc/Connections.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public final class Connections {
7474
private static boolean sqlServerInited, h2Inited, mysqlInited, postgresqlInited;
7575

7676
public static R2DBCConnectionProvider getR2DBCConnectionProvider(String url) {
77-
return () -> Mono.from(getConnectionProvider(url).create());
77+
return R2DBCConnectionProvider.from(getConnectionProvider(url));
7878
}
7979

8080
public static ConnectionFactory getConnectionProvider(String url) {

0 commit comments

Comments
 (0)