Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import java.lang.invoke.MethodHandles;
import java.util.Locale;
import java.util.Objects;

import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;

import io.vertx.core.Context;
import io.vertx.core.internal.ContextInternal;

/**
* Commonly used assertions to verify that the operations
Expand Down Expand Up @@ -52,4 +54,13 @@ public static void assertCurrentThreadMatches(Thread expectedThread) {
}
}

public static void assertCurrentContextMatches(Object object, ContextInternal expectedContext) {
if ( ENFORCE ) {
final ContextInternal currentContext = ContextInternal.current();
Objects.requireNonNull( currentContext, "Current context cannot be null" );
if ( !currentContext.equals( expectedContext ) ) {
throw LOG.unexpectedContextDetected( object, expectedContext, currentContext );
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.pool.CombinerExecutor;
import io.vertx.core.internal.pool.Executor;
import io.vertx.core.internal.pool.Task;

import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage;

/**
* A {@link ReactiveIdentifierGenerator} which uses the database to allocate
Expand Down Expand Up @@ -93,39 +95,34 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier connectionSuppl
}

final CompletableFuture<Long> resultForThisEventLoop = new CompletableFuture<>();
final CompletableFuture<Long> result = new CompletableFuture<>();
final Context context = Vertx.currentContext();
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
result.whenComplete( (id, t) -> {
final Context newContext = Vertx.currentContext();
//Need to be careful in resuming processing on the same context as the original
//request, potentially having to switch back if we're no longer executing on the same:
if ( newContext != context ) {
if ( t != null ) {
context.runOnContext( v -> resultForThisEventLoop.completeExceptionally( t ) );
// We use supplyStage so that, no matter if there's an exception, we always return something that will complete
return supplyStage( () -> {
final CompletableFuture<Long> result = new CompletableFuture<>();
final Context context = Vertx.currentContext();
executor.submit( new GenerateIdAction( connectionSupplier, result ) );
result.whenComplete( (id, t) -> {
final Context newContext = Vertx.currentContext();
//Need to be careful in resuming processing on the same context as the original
//request, potentially having to switch back if we're no longer executing on the same:
if ( newContext != context ) {
context.runOnContext( v -> complete( resultForThisEventLoop, id, t ) );
}
else {
context.runOnContext( v -> resultForThisEventLoop.complete( id ) );
complete( resultForThisEventLoop, id, t );
}
}
else {
if ( t != null ) {
resultForThisEventLoop.completeExceptionally( t );
}
else {
resultForThisEventLoop.complete( id );
}
}
} );
return resultForThisEventLoop;
} );
return resultForThisEventLoop;
}

private final class GenerateIdAction implements Executor.Action<GeneratorState> {

private final ReactiveConnectionSupplier connectionSupplier;
private final CompletableFuture<Long> result;
private final ContextInternal creationContext;

public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture<Long> result) {
this.creationContext = ContextInternal.current();
this.connectionSupplier = Objects.requireNonNull( connectionSupplier );
this.result = Objects.requireNonNull( result );
}
Expand All @@ -137,35 +134,38 @@ public Task execute(GeneratorState state) {
// We don't need to update or initialize the hi
// value in the table, so just increment the lo
// value and return the next id in the block
completedFuture( local ).whenComplete( this::acceptAsReturnValue );
result.complete( local );
}
else {
nextHiValue( connectionSupplier )
.whenComplete( (newlyGeneratedHi, throwable) -> {
if ( throwable != null ) {
result.completeExceptionally( throwable );
}
else {
//We ignore the state argument as we actually use the field directly
//for convenience, but they are the same object.
executor.submit( stateIgnored -> {
result.complete( next( newlyGeneratedHi ) );
return null;
} );
}
} );
creationContext.runOnContext( this::generateNewHiValue );
}
return null;
}

private void acceptAsReturnValue(final Long aLong, final Throwable throwable) {
if ( throwable != null ) {
result.completeExceptionally( throwable );
}
else {
result.complete( aLong );
}
private void generateNewHiValue(Void v) {
nextHiValue( connectionSupplier )
.whenComplete( (newlyGeneratedHi, throwable) -> {
if ( throwable != null ) {
result.completeExceptionally( throwable );
}
else {
//We ignore the state argument as we actually use the field directly
//for convenience, but they are the same object.
executor.submit( stateIgnored -> {
result.complete( next( newlyGeneratedHi ) );
return null;
} );
}
} );
Comment on lines +146 to +159
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure nextHiValue will never throw an exception? I mean the method, not the future it returns.

Because if it does throw an exception, the whenComplete call here will be skipped and the operation will hang forever from the user's point of view.

}
}

private static <T> void complete(CompletableFuture<T> future, final T result, final Throwable throwable) {
if ( throwable != null ) {
future.completeExceptionally( throwable );
}
else {
future.complete( result );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
package org.hibernate.reactive.logging.impl;



import java.sql.SQLException;
import java.sql.SQLWarning;

import jakarta.persistence.PersistenceException;

import org.hibernate.HibernateException;
import org.hibernate.JDBCException;
import org.hibernate.LazyInitializationException;
Expand All @@ -26,6 +23,9 @@
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;

import io.vertx.core.internal.ContextInternal;
import jakarta.persistence.PersistenceException;

import static org.jboss.logging.Logger.Level.DEBUG;
import static org.jboss.logging.Logger.Level.ERROR;
import static org.jboss.logging.Logger.Level.INFO;
Expand Down Expand Up @@ -274,6 +274,9 @@ public interface Log extends BasicLogger {
@Message(id = 86, value = "Error closing reactive connection")
void errorClosingConnection(@Cause Throwable throwable);

@Message(id = 88, value = "Expected to use the object %1$s on context %2$s but was %3$s")
HibernateException unexpectedContextDetected(Object obj, ContextInternal expectedContext, ContextInternal currentContext);

// Same method that exists in CoreMessageLogger
@LogMessage(level = WARN)
@Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
import org.hibernate.reactive.adaptor.impl.JdbcNull;
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
import org.hibernate.reactive.common.InternalStateAssertions;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.pool.BatchingConnection;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.util.impl.CompletionStages;

import io.vertx.core.internal.ContextInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.DatabaseException;
Expand Down Expand Up @@ -59,14 +61,22 @@ public class SqlClientConnection implements ReactiveConnection {

private final Pool pool;
private final SqlConnection connection;
// The context associated to the connection. We expect the connection to be executed in this context.
private final ContextInternal connectionContext;
private Transaction transaction;

SqlClientConnection(SqlConnection connection, Pool pool, SqlStatementLogger sqlStatementLogger, SqlExceptionHelper sqlExceptionHelper) {
SqlClientConnection(
SqlConnection connection,
Pool pool,
SqlStatementLogger sqlStatementLogger,
SqlExceptionHelper sqlExceptionHelper,
ContextInternal connectionContext) {
this.connectionContext = connectionContext;
this.pool = pool;
this.sqlStatementLogger = sqlStatementLogger;
this.connection = connection;
this.sqlExceptionHelper = sqlExceptionHelper;
LOG.tracef( "Connection created: %s", connection );
LOG.tracef( "Connection created for %1$s associated to context %2$s: ", connection, connectionContext );
}

@Override
Expand Down Expand Up @@ -338,6 +348,7 @@ public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql)
}

private void feedback(String sql) {
InternalStateAssertions.assertCurrentContextMatches( this, connectionContext );
Objects.requireNonNull( sql, "SQL query cannot be null" );
// DDL already gets formatted by the client, so don't reformat it
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )
Expand Down
Loading