Skip to content

Commit 9dce31a

Browse files
committed
[#2768] Avoid context switching on id generation
This commit: * Stop connection leaks in BlockingIdentifierGenerator * Assert that the connection is used in the expected context
1 parent a7dc0a1 commit 9dce31a

File tree

3 files changed

+114
-77
lines changed

3 files changed

+114
-77
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414

1515
import io.vertx.core.Context;
1616
import io.vertx.core.Vertx;
17+
import io.vertx.core.internal.ContextInternal;
1718
import io.vertx.core.internal.pool.CombinerExecutor;
1819
import io.vertx.core.internal.pool.Executor;
1920
import io.vertx.core.internal.pool.Task;
2021

2122
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
23+
import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage;
2224

2325
/**
2426
* A {@link ReactiveIdentifierGenerator} which uses the database to allocate
@@ -93,39 +95,34 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
9395
}
9496

9597
final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>();
96-
final CompletableFuture<Long> result = new CompletableFuture<>();
97-
final Context context = Vertx.currentContext();
98-
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
99-
result.whenComplete( (id, t) -> {
100-
final Context newContext = Vertx.currentContext();
101-
//Need to be careful in resuming processing on the same context as the original
102-
//request, potentially having to switch back if we're no longer executing on the same:
103-
if ( newContext != context ) {
104-
if ( t != null ) {
105-
context.runOnContext( v -> resultForThisEventLoop.completeExceptionally( t ) );
98+
// We use supplyStage so that, no matter if there's an exception, we always return something that will complete
99+
return supplyStage( () -> {
100+
final CompletableFuture<Long> result = new CompletableFuture<>();
101+
final Context context = Vertx.currentContext();
102+
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
103+
result.whenComplete( (id, t) -> {
104+
final Context newContext = Vertx.currentContext();
105+
//Need to be careful in resuming processing on the same context as the original
106+
//request, potentially having to switch back if we're no longer executing on the same:
107+
if ( newContext != context ) {
108+
context.runOnContext( v -> complete( resultForThisEventLoop, id, t ) );
106109
}
107110
else {
108-
context.runOnContext( v -> resultForThisEventLoop.complete( id ) );
111+
complete( resultForThisEventLoop, id, t );
109112
}
110-
}
111-
else {
112-
if ( t != null ) {
113-
resultForThisEventLoop.completeExceptionally( t );
114-
}
115-
else {
116-
resultForThisEventLoop.complete( id );
117-
}
118-
}
113+
} );
114+
return resultForThisEventLoop;
119115
} );
120-
return resultForThisEventLoop;
121116
}
122117

123118
private final class GenerateIdAction implements Executor.Action<GeneratorState> {
124119

125120
private final ReactiveConnectionSupplier connectionSupplier;
126121
private final CompletableFuture<Long> result;
122+
private final ContextInternal creationContext;
127123

128124
public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture<Long> result) {
125+
this.creationContext = ContextInternal.current();
129126
this.connectionSupplier = Objects.requireNonNull( connectionSupplier );
130127
this.result = Objects.requireNonNull( result );
131128
}
@@ -137,35 +134,38 @@ public Task execute(GeneratorState state) {
137134
// We don't need to update or initialize the hi
138135
// value in the table, so just increment the lo
139136
// value and return the next id in the block
140-
completedFuture( local ).whenComplete( this::acceptAsReturnValue );
137+
result.complete( local );
141138
}
142139
else {
143-
nextHiValue( connectionSupplier )
144-
.whenComplete( (newlyGeneratedHi, throwable) -> {
145-
if ( throwable != null ) {
146-
result.completeExceptionally( throwable );
147-
}
148-
else {
149-
//We ignore the state argument as we actually use the field directly
150-
//for convenience, but they are the same object.
151-
executor.submit( stateIgnored -> {
152-
result.complete( next( newlyGeneratedHi ) );
153-
return null;
154-
} );
155-
}
156-
} );
140+
creationContext.runOnContext( this::generateNewHiValue );
157141
}
158142
return null;
159143
}
160144

161-
private void acceptAsReturnValue(final Long aLong, final Throwable throwable) {
162-
if ( throwable != null ) {
163-
result.completeExceptionally( throwable );
164-
}
165-
else {
166-
result.complete( aLong );
167-
}
145+
private void generateNewHiValue(Void v) {
146+
nextHiValue( connectionSupplier )
147+
.whenComplete( (newlyGeneratedHi, throwable) -> {
148+
if ( throwable != null ) {
149+
result.completeExceptionally( throwable );
150+
}
151+
else {
152+
//We ignore the state argument as we actually use the field directly
153+
//for convenience, but they are the same object.
154+
executor.submit( stateIgnored -> {
155+
result.complete( next( newlyGeneratedHi ) );
156+
return null;
157+
} );
158+
}
159+
} );
168160
}
169161
}
170162

163+
private static <T> void complete(CompletableFuture<T> future, final T result, final Throwable throwable) {
164+
if ( throwable != null ) {
165+
future.completeExceptionally( throwable );
166+
}
167+
else {
168+
future.complete( result );
169+
}
170+
}
171171
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
1919
import org.hibernate.reactive.adaptor.impl.JdbcNull;
2020
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
21+
import org.hibernate.reactive.common.InternalStateAssertions;
2122
import org.hibernate.reactive.logging.impl.Log;
2223
import org.hibernate.reactive.logging.impl.LoggerFactory;
2324
import org.hibernate.reactive.pool.BatchingConnection;
2425
import org.hibernate.reactive.pool.ReactiveConnection;
2526
import org.hibernate.reactive.util.impl.CompletionStages;
2627

28+
import io.vertx.core.internal.ContextInternal;
2729
import io.vertx.core.json.JsonArray;
2830
import io.vertx.core.json.JsonObject;
2931
import io.vertx.sqlclient.DatabaseException;
@@ -59,14 +61,22 @@ public class SqlClientConnection implements ReactiveConnection {
5961

6062
private final Pool pool;
6163
private final SqlConnection connection;
64+
// The context associated to the connection. We expect the connection to be executed in this context.
65+
private final ContextInternal connectionContext;
6266
private Transaction transaction;
6367

64-
SqlClientConnection(SqlConnection connection, Pool pool, SqlStatementLogger sqlStatementLogger, SqlExceptionHelper sqlExceptionHelper) {
68+
SqlClientConnection(
69+
SqlConnection connection,
70+
Pool pool,
71+
SqlStatementLogger sqlStatementLogger,
72+
SqlExceptionHelper sqlExceptionHelper,
73+
ContextInternal connectionContext) {
74+
this.connectionContext = connectionContext;
6575
this.pool = pool;
6676
this.sqlStatementLogger = sqlStatementLogger;
6777
this.connection = connection;
6878
this.sqlExceptionHelper = sqlExceptionHelper;
69-
LOG.tracef( "Connection created: %s", connection );
79+
LOG.tracef( "Connection created for %1$s associated to context %2$s: ", connection, connectionContext );
7080
}
7181

7282
@Override
@@ -338,6 +348,7 @@ public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql)
338348
}
339349

340350
private void feedback(String sql) {
351+
InternalStateAssertions.assertCurrentContextMatches( this, connectionContext );
341352
Objects.requireNonNull( sql, "SQL query cannot be null" );
342353
// DDL already gets formatted by the client, so don't reformat it
343354
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.lang.invoke.MethodHandles;
9+
import java.lang.invoke.VarHandle;
810
import java.sql.ResultSet;
911
import java.sql.SQLException;
1012
import java.util.List;
@@ -22,6 +24,7 @@
2224
import org.hibernate.reactive.pool.ReactiveConnectionPool;
2325

2426
import io.vertx.core.Future;
27+
import io.vertx.core.internal.ContextInternal;
2528
import io.vertx.sqlclient.DatabaseException;
2629
import io.vertx.sqlclient.Pool;
2730
import io.vertx.sqlclient.Row;
@@ -30,7 +33,6 @@
3033
import io.vertx.sqlclient.Tuple;
3134
import io.vertx.sqlclient.spi.DatabaseMetadata;
3235

33-
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
3436
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
3537
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
3638

@@ -123,12 +125,16 @@ public CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExc
123125
}
124126

125127
private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool) {
126-
return completionStage( pool.getConnection().map( this::newConnection ), ReactiveConnection::close );
128+
return completeFuture(
129+
pool.getConnection().map( this::newConnection ),
130+
ReactiveConnection::close
131+
);
127132
}
128133

129134
private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, SqlExceptionHelper sqlExceptionHelper) {
130-
return completionStage(
131-
pool.getConnection().map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ),
135+
return completeFuture(
136+
pool.getConnection()
137+
.map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ),
132138
ReactiveConnection::close
133139
);
134140
}
@@ -189,8 +195,8 @@ private void feedback(String sql) {
189195
/**
190196
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
191197
*/
192-
private <T> CompletionStage<T> completionStage(Future<T> future, Consumer<T> onCancellation) {
193-
CompletableFuture<T> completableFuture = new CompletableFuture<>();
198+
private <T> CompletionStage<T> completeFuture(Future<T> future, Consumer<T> onCancellation) {
199+
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
194200
future.onComplete( ar -> {
195201
if ( ar.succeeded() ) {
196202
if ( completableFuture.isCancelled() ) {
@@ -210,13 +216,32 @@ private SqlClientConnection newConnection(SqlConnection connection) {
210216
}
211217

212218
private SqlClientConnection newConnection(SqlConnection connection, SqlExceptionHelper sqlExceptionHelper) {
213-
return new SqlClientConnection( connection, getPool(), getSqlStatementLogger(), sqlExceptionHelper );
219+
return new SqlClientConnection(
220+
connection,
221+
getPool(),
222+
getSqlStatementLogger(),
223+
sqlExceptionHelper,
224+
ContextInternal.current()
225+
);
214226
}
215227

216228
private static class ProxyConnection implements ReactiveConnection {
229+
230+
private static final VarHandle OPENED_HANDLE;
231+
232+
static {
233+
try {
234+
MethodHandles.Lookup lookup = MethodHandles.lookup();
235+
OPENED_HANDLE = lookup.findVarHandle( ProxyConnection.class, "opened", boolean.class );
236+
}
237+
catch (ReflectiveOperationException e) {
238+
throw new ExceptionInInitializerError( e );
239+
}
240+
}
241+
217242
private final Supplier<CompletionStage<ReactiveConnection>> connectionSupplier;
218-
private Integer batchSize;
219-
private ReactiveConnection connection;
243+
private final CompletableFuture<ReactiveConnection> connectionFuture = new CompletableFuture<>();
244+
private volatile boolean opened = false;
220245

221246
public ProxyConnection(Supplier<CompletionStage<ReactiveConnection>> connectionSupplier) {
222247
this.connectionSupplier = connectionSupplier;
@@ -225,29 +250,35 @@ public ProxyConnection(Supplier<CompletionStage<ReactiveConnection>> connectionS
225250
/**
226251
* @return the existing {@link ReactiveConnection}, or open a new one
227252
*/
228-
CompletionStage<ReactiveConnection> connection() {
229-
if ( connection == null ) {
230-
return connectionSupplier.get()
231-
.thenApply( conn -> {
232-
if ( batchSize != null ) {
233-
conn.withBatchSize( batchSize );
234-
}
235-
connection = conn;
236-
return connection;
237-
} );
253+
private CompletionStage<ReactiveConnection> connection() {
254+
if ( opened ) {
255+
return connectionFuture;
256+
}
257+
if ( OPENED_HANDLE.compareAndSet( this, false, true ) ) {
258+
connectionSupplier.get().whenComplete( (connection, throwable) -> {
259+
if ( throwable != null ) {
260+
connectionFuture.completeExceptionally( throwable );
261+
}
262+
else {
263+
connectionFuture.complete( connection );
264+
}
265+
} );
238266
}
239-
return completedFuture( connection );
267+
return connectionFuture;
240268
}
241269

242270
@Override
243271
public boolean isTransactionInProgress() {
244-
return connection != null && connection.isTransactionInProgress();
272+
ReactiveConnection reactiveConnection = connectionFuture.getNow( null );
273+
return reactiveConnection != null && reactiveConnection.isTransactionInProgress();
245274
}
246275

247276
@Override
248277
public DatabaseMetadata getDatabaseMetadata() {
249-
Objects.requireNonNull( connection, "Database metadata not available until the connection is opened" );
250-
return connection.getDatabaseMetadata();
278+
return Objects.requireNonNull(
279+
connectionFuture.getNow( null ),
280+
"Database metadata not available until the connection is opened"
281+
).getDatabaseMetadata();
251282
}
252283

253284
@Override
@@ -356,13 +387,8 @@ public CompletionStage<Void> rollbackTransaction() {
356387
}
357388

358389
@Override
359-
public ReactiveConnection withBatchSize(int batchSize) {
360-
if ( connection == null ) {
361-
this.batchSize = batchSize;
362-
}
363-
else {
364-
connection = connection.withBatchSize( batchSize );
365-
}
390+
public ProxyConnection withBatchSize(int batchSize) {
391+
connectionFuture.thenApply( reactiveConnection -> reactiveConnection.withBatchSize( batchSize ) );
366392
return this;
367393
}
368394

@@ -373,8 +399,8 @@ public CompletionStage<Void> executeBatch() {
373399

374400
@Override
375401
public CompletionStage<Void> close() {
376-
return connection != null
377-
? connection.close().thenAccept( v -> connection = null )
402+
return opened
403+
? connectionFuture.getNow( null ).close()
378404
: voidFuture();
379405
}
380406
}

0 commit comments

Comments
 (0)