diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java index d356334c1..a24c3b8fd 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java @@ -7,11 +7,13 @@ import java.lang.invoke.MethodHandles; import java.util.Locale; +import java.util.Objects; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import io.vertx.core.Context; +import io.vertx.core.internal.ContextInternal; /** * Commonly used assertions to verify that the operations @@ -52,4 +54,13 @@ public static void assertCurrentThreadMatches(Thread expectedThread) { } } + public static void assertCurrentContextMatches(Object object, ContextInternal expectedContext) { + if ( ENFORCE ) { + final ContextInternal currentContext = ContextInternal.current(); + Objects.requireNonNull( currentContext, "Current context cannot be null" ); + if ( !currentContext.equals( expectedContext ) ) { + throw LOG.unexpectedContextDetected( object, expectedContext, currentContext ); + } + } + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java index 329792f28..fc08e6e92 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java @@ -14,11 +14,13 @@ import io.vertx.core.Context; import io.vertx.core.Vertx; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.pool.CombinerExecutor; import io.vertx.core.internal.pool.Executor; import io.vertx.core.internal.pool.Task; import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; +import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage; /** * A {@link ReactiveIdentifierGenerator} which uses the database to allocate @@ -93,39 +95,34 @@ public CompletionStage generate(ReactiveConnectionSupplier connectionSuppl } final CompletableFuture resultForThisEventLoop = new CompletableFuture<>(); - final CompletableFuture result = new CompletableFuture<>(); - final Context context = Vertx.currentContext(); - executor.submit( new GenerateIdAction( connectionSupplier, result ) ); - result.whenComplete( (id, t) -> { - final Context newContext = Vertx.currentContext(); - //Need to be careful in resuming processing on the same context as the original - //request, potentially having to switch back if we're no longer executing on the same: - if ( newContext != context ) { - if ( t != null ) { - context.runOnContext( v -> resultForThisEventLoop.completeExceptionally( t ) ); + // We use supplyStage so that, no matter if there's an exception, we always return something that will complete + return supplyStage( () -> { + final CompletableFuture result = new CompletableFuture<>(); + final Context context = Vertx.currentContext(); + executor.submit( new GenerateIdAction( connectionSupplier, result ) ); + result.whenComplete( (id, t) -> { + final Context newContext = Vertx.currentContext(); + //Need to be careful in resuming processing on the same context as the original + //request, potentially having to switch back if we're no longer executing on the same: + if ( newContext != context ) { + context.runOnContext( v -> complete( resultForThisEventLoop, id, t ) ); } else { - context.runOnContext( v -> resultForThisEventLoop.complete( id ) ); + complete( resultForThisEventLoop, id, t ); } - } - else { - if ( t != null ) { - resultForThisEventLoop.completeExceptionally( t ); - } - else { - resultForThisEventLoop.complete( id ); - } - } + } ); + return resultForThisEventLoop; } ); - return resultForThisEventLoop; } private final class GenerateIdAction implements Executor.Action { private final ReactiveConnectionSupplier connectionSupplier; private final CompletableFuture result; + private final ContextInternal creationContext; public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture result) { + this.creationContext = ContextInternal.current(); this.connectionSupplier = Objects.requireNonNull( connectionSupplier ); this.result = Objects.requireNonNull( result ); } @@ -137,35 +134,38 @@ public Task execute(GeneratorState state) { // We don't need to update or initialize the hi // value in the table, so just increment the lo // value and return the next id in the block - completedFuture( local ).whenComplete( this::acceptAsReturnValue ); + result.complete( local ); } else { - nextHiValue( connectionSupplier ) - .whenComplete( (newlyGeneratedHi, throwable) -> { - if ( throwable != null ) { - result.completeExceptionally( throwable ); - } - else { - //We ignore the state argument as we actually use the field directly - //for convenience, but they are the same object. - executor.submit( stateIgnored -> { - result.complete( next( newlyGeneratedHi ) ); - return null; - } ); - } - } ); + creationContext.runOnContext( this::generateNewHiValue ); } return null; } - private void acceptAsReturnValue(final Long aLong, final Throwable throwable) { - if ( throwable != null ) { - result.completeExceptionally( throwable ); - } - else { - result.complete( aLong ); - } + private void generateNewHiValue(Void v) { + nextHiValue( connectionSupplier ) + .whenComplete( (newlyGeneratedHi, throwable) -> { + if ( throwable != null ) { + result.completeExceptionally( throwable ); + } + else { + //We ignore the state argument as we actually use the field directly + //for convenience, but they are the same object. + executor.submit( stateIgnored -> { + result.complete( next( newlyGeneratedHi ) ); + return null; + } ); + } + } ); } } + private static void complete(CompletableFuture future, final T result, final Throwable throwable) { + if ( throwable != null ) { + future.completeExceptionally( throwable ); + } + else { + future.complete( result ); + } + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java index 26743d7af..11aa731c5 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java @@ -6,12 +6,9 @@ package org.hibernate.reactive.logging.impl; - import java.sql.SQLException; import java.sql.SQLWarning; -import jakarta.persistence.PersistenceException; - import org.hibernate.HibernateException; import org.hibernate.JDBCException; import org.hibernate.LazyInitializationException; @@ -26,6 +23,9 @@ import org.jboss.logging.annotations.Message; import org.jboss.logging.annotations.MessageLogger; +import io.vertx.core.internal.ContextInternal; +import jakarta.persistence.PersistenceException; + import static org.jboss.logging.Logger.Level.DEBUG; import static org.jboss.logging.Logger.Level.ERROR; import static org.jboss.logging.Logger.Level.INFO; @@ -274,6 +274,9 @@ public interface Log extends BasicLogger { @Message(id = 86, value = "Error closing reactive connection") void errorClosingConnection(@Cause Throwable throwable); + @Message(id = 88, value = "Expected to use the object %1$s on context %2$s but was %3$s") + HibernateException unexpectedContextDetected(Object obj, ContextInternal expectedContext, ContextInternal currentContext); + // Same method that exists in CoreMessageLogger @LogMessage(level = WARN) @Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" ) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java index 88b536223..c2467da64 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java @@ -18,12 +18,14 @@ import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.reactive.adaptor.impl.JdbcNull; import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; +import org.hibernate.reactive.common.InternalStateAssertions; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.BatchingConnection; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.util.impl.CompletionStages; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.DatabaseException; @@ -59,14 +61,22 @@ public class SqlClientConnection implements ReactiveConnection { private final Pool pool; private final SqlConnection connection; + // The context associated to the connection. We expect the connection to be executed in this context. + private final ContextInternal connectionContext; private Transaction transaction; - SqlClientConnection(SqlConnection connection, Pool pool, SqlStatementLogger sqlStatementLogger, SqlExceptionHelper sqlExceptionHelper) { + SqlClientConnection( + SqlConnection connection, + Pool pool, + SqlStatementLogger sqlStatementLogger, + SqlExceptionHelper sqlExceptionHelper, + ContextInternal connectionContext) { + this.connectionContext = connectionContext; this.pool = pool; this.sqlStatementLogger = sqlStatementLogger; this.connection = connection; this.sqlExceptionHelper = sqlExceptionHelper; - LOG.tracef( "Connection created: %s", connection ); + LOG.tracef( "Connection created for %1$s associated to context %2$s: ", connection, connectionContext ); } @Override @@ -338,6 +348,7 @@ public CompletionStage> preparedQueryOutsideTransaction(String sql) } private void feedback(String sql) { + InternalStateAssertions.assertCurrentContextMatches( this, connectionContext ); Objects.requireNonNull( sql, "SQL query cannot be null" ); // DDL already gets formatted by the client, so don't reformat it FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() ) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java index bfbf8bc76..884a0fc93 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java @@ -5,6 +5,8 @@ */ package org.hibernate.reactive.pool.impl; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; @@ -22,6 +24,7 @@ import org.hibernate.reactive.pool.ReactiveConnectionPool; import io.vertx.core.Future; +import io.vertx.core.internal.ContextInternal; import io.vertx.sqlclient.DatabaseException; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.Row; @@ -30,7 +33,6 @@ import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.spi.DatabaseMetadata; -import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; @@ -123,12 +125,16 @@ public CompletionStage getConnection(String tenantId, SqlExc } private CompletionStage getConnectionFromPool(Pool pool) { - return completionStage( pool.getConnection().map( this::newConnection ), ReactiveConnection::close ); + return completeFuture( + pool.getConnection().map( this::newConnection ), + ReactiveConnection::close + ); } private CompletionStage getConnectionFromPool(Pool pool, SqlExceptionHelper sqlExceptionHelper) { - return completionStage( - pool.getConnection().map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ), + return completeFuture( + pool.getConnection() + .map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ), ReactiveConnection::close ); } @@ -189,8 +195,8 @@ private void feedback(String sql) { /** * @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation. */ - private CompletionStage completionStage(Future future, Consumer onCancellation) { - CompletableFuture completableFuture = new CompletableFuture<>(); + private CompletionStage completeFuture(Future future, Consumer onCancellation) { + final CompletableFuture completableFuture = new CompletableFuture<>(); future.onComplete( ar -> { if ( ar.succeeded() ) { if ( completableFuture.isCancelled() ) { @@ -210,13 +216,32 @@ private SqlClientConnection newConnection(SqlConnection connection) { } private SqlClientConnection newConnection(SqlConnection connection, SqlExceptionHelper sqlExceptionHelper) { - return new SqlClientConnection( connection, getPool(), getSqlStatementLogger(), sqlExceptionHelper ); + return new SqlClientConnection( + connection, + getPool(), + getSqlStatementLogger(), + sqlExceptionHelper, + ContextInternal.current() + ); } private static class ProxyConnection implements ReactiveConnection { + + private static final VarHandle OPENED_HANDLE; + + static { + try { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + OPENED_HANDLE = lookup.findVarHandle( ProxyConnection.class, "opened", boolean.class ); + } + catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError( e ); + } + } + private final Supplier> connectionSupplier; - private Integer batchSize; - private ReactiveConnection connection; + private final CompletableFuture connectionFuture = new CompletableFuture<>(); + private volatile boolean opened = false; public ProxyConnection(Supplier> connectionSupplier) { this.connectionSupplier = connectionSupplier; @@ -225,29 +250,35 @@ public ProxyConnection(Supplier> connectionS /** * @return the existing {@link ReactiveConnection}, or open a new one */ - CompletionStage connection() { - if ( connection == null ) { - return connectionSupplier.get() - .thenApply( conn -> { - if ( batchSize != null ) { - conn.withBatchSize( batchSize ); - } - connection = conn; - return connection; - } ); + private CompletionStage connection() { + if ( opened ) { + return connectionFuture; + } + if ( OPENED_HANDLE.compareAndSet( this, false, true ) ) { + connectionSupplier.get().whenComplete( (connection, throwable) -> { + if ( throwable != null ) { + connectionFuture.completeExceptionally( throwable ); + } + else { + connectionFuture.complete( connection ); + } + } ); } - return completedFuture( connection ); + return connectionFuture; } @Override public boolean isTransactionInProgress() { - return connection != null && connection.isTransactionInProgress(); + ReactiveConnection reactiveConnection = connectionFuture.getNow( null ); + return reactiveConnection != null && reactiveConnection.isTransactionInProgress(); } @Override public DatabaseMetadata getDatabaseMetadata() { - Objects.requireNonNull( connection, "Database metadata not available until the connection is opened" ); - return connection.getDatabaseMetadata(); + return Objects.requireNonNull( + connectionFuture.getNow( null ), + "Database metadata not available until the connection is opened" + ).getDatabaseMetadata(); } @Override @@ -356,13 +387,8 @@ public CompletionStage rollbackTransaction() { } @Override - public ReactiveConnection withBatchSize(int batchSize) { - if ( connection == null ) { - this.batchSize = batchSize; - } - else { - connection = connection.withBatchSize( batchSize ); - } + public ProxyConnection withBatchSize(int batchSize) { + connectionFuture.thenApply( reactiveConnection -> reactiveConnection.withBatchSize( batchSize ) ); return this; } @@ -373,8 +399,8 @@ public CompletionStage executeBatch() { @Override public CompletionStage close() { - return connection != null - ? connection.close().thenAccept( v -> connection = null ) + return opened + ? connectionFuture.getNow( null ).close() : voidFuture(); } } diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java index 2948cce27..20f3ae70c 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java @@ -197,8 +197,13 @@ private CompletionStage storeEntity(Stage.Session s) { final EntityWithGeneratedId entity = new EntityWithGeneratedId(); entity.name = beforeOperationThread + "__" + localVerticleOperationSequence; + // We are not using transactions on purpose here, because this approach will cause a context switch + // and an assertion error if things aren't handled correctly. See Hibernate Reactive issue #2768: + // https://github.com/hibernate/hibernate-reactive/issues/2768 return s - .withTransaction( t -> s.persist( entity ) ) + .persist( entity ) + .thenCompose( v -> s.flush() ) + .thenAccept( v -> s.clear() ) .thenCompose( v -> beforeOperationThread != Thread.currentThread() ? failedFuture( new IllegalStateException( "Detected an unexpected switch of carrier threads!" ) ) : voidFuture() );