Skip to content

Commit 6316ff7

Browse files
geoandDavideD
authored andcommitted
Introduce batch single table batch support
This change leverages the fact that when batching is enabled, a BatchingConnection is provided under the hood that takes care of all the necessary plumbing. Before the change, 122 tests where failing, after this change the number drops to 90
1 parent 6d5896f commit 6316ff7

File tree

5 files changed

+102
-36
lines changed

5 files changed

+102
-36
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.engine.jdbc;
7+
8+
import java.sql.SQLException;
9+
import org.hibernate.engine.jdbc.mutation.OperationResultChecker;
10+
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
11+
import org.hibernate.engine.jdbc.mutation.internal.ModelMutationHelper;
12+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
13+
14+
public final class ResultsCheckerUtil {
15+
16+
private ResultsCheckerUtil() {
17+
}
18+
19+
20+
public static void checkResults(
21+
SharedSessionContractImplementor session,
22+
PreparedStatementDetails statementDetails,
23+
OperationResultChecker resultChecker,
24+
Integer affectedRowCount, int batchPosition) {
25+
try {
26+
ModelMutationHelper.checkResults( resultChecker, statementDetails, affectedRowCount, batchPosition);
27+
}
28+
catch (SQLException e) {
29+
throw session.getJdbcServices().getSqlExceptionHelper()
30+
.convert(
31+
e,
32+
String.format(
33+
"Unable to execute mutation PreparedStatement against table `%s`",
34+
statementDetails.getMutatingTableDetails().getTableName()
35+
),
36+
statementDetails.getSqlString()
37+
);
38+
}
39+
}
40+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/env/internal/ReactiveMutationExecutor.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
package org.hibernate.reactive.engine.jdbc.env.internal;
77

88
import java.lang.invoke.MethodHandles;
9-
import java.sql.SQLException;
109
import java.util.concurrent.CompletionStage;
1110

1211
import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
1312
import org.hibernate.engine.jdbc.mutation.MutationExecutor;
1413
import org.hibernate.engine.jdbc.mutation.OperationResultChecker;
1514
import org.hibernate.engine.jdbc.mutation.TableInclusionChecker;
1615
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
17-
import org.hibernate.engine.jdbc.mutation.internal.ModelMutationHelper;
1816
import org.hibernate.engine.spi.SharedSessionContractImplementor;
1917
import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor;
2018
import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor;
@@ -26,6 +24,7 @@
2624
import org.hibernate.sql.model.TableMapping;
2725
import org.hibernate.sql.model.ValuesAnalysis;
2826

27+
import static org.hibernate.reactive.engine.jdbc.ResultsCheckerUtil.checkResults;
2928
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
3029
import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER;
3130
import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER_TRACE_ENABLED;
@@ -55,7 +54,7 @@ default CompletionStage<Object> executeReactive(
5554
SharedSessionContractImplementor session) {
5655
return performReactiveNonBatchedOperations( valuesAnalysis, inclusionChecker, resultChecker, session )
5756
.thenCompose( ignore -> performReactiveSelfExecutingOperations( valuesAnalysis, inclusionChecker, session ) )
58-
.thenCompose( ignore -> performReactiveBatchedOperations( valuesAnalysis, inclusionChecker ) )
57+
.thenCompose( ignore -> performReactiveBatchedOperations( valuesAnalysis, inclusionChecker, resultChecker, session ) )
5958
.thenApply( CompletionStages::nullFuture );
6059
}
6160

@@ -76,7 +75,8 @@ default CompletionStage<Void> performReactiveSelfExecutingOperations(
7675

7776
default CompletionStage<Void> performReactiveBatchedOperations(
7877
ValuesAnalysis valuesAnalysis,
79-
TableInclusionChecker inclusionChecker) {
78+
TableInclusionChecker inclusionChecker, OperationResultChecker resultChecker,
79+
SharedSessionContractImplementor session) {
8080
return voidFuture();
8181
}
8282

@@ -116,7 +116,7 @@ default CompletionStage<Void> performReactiveNonBatchedMutation(
116116
// the optional table did not have a row
117117
return voidFuture();
118118
}
119-
checkResults( session, statementDetails, resultChecker, affectedRowCount );
119+
checkResults( session, statementDetails, resultChecker, affectedRowCount, -1);
120120
return voidFuture();
121121
} )
122122
.whenComplete( (o, throwable) -> {
@@ -127,24 +127,4 @@ default CompletionStage<Void> performReactiveNonBatchedMutation(
127127
} );
128128
}
129129

130-
private static void checkResults(
131-
SharedSessionContractImplementor session,
132-
PreparedStatementDetails statementDetails,
133-
OperationResultChecker resultChecker,
134-
Integer affectedRowCount) {
135-
try {
136-
ModelMutationHelper.checkResults( resultChecker, statementDetails, affectedRowCount, -1 );
137-
}
138-
catch (SQLException e) {
139-
throw session.getJdbcServices().getSqlExceptionHelper()
140-
.convert(
141-
e,
142-
String.format(
143-
"Unable to execute mutation PreparedStatement against table `%s`",
144-
statementDetails.getMutatingTableDetails().getTableName()
145-
),
146-
statementDetails.getSqlString()
147-
);
148-
}
149-
}
150130
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/mutation/internal/ReactiveMutationExecutorSingleBatched.java

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,27 @@
55
*/
66
package org.hibernate.reactive.engine.jdbc.mutation.internal;
77

8+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
9+
import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER;
10+
import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER_TRACE_ENABLED;
811
import java.util.concurrent.CompletionStage;
9-
1012
import org.hibernate.engine.jdbc.batch.spi.BatchKey;
13+
import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
14+
import org.hibernate.engine.jdbc.mutation.OperationResultChecker;
1115
import org.hibernate.engine.jdbc.mutation.TableInclusionChecker;
16+
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
1217
import org.hibernate.engine.jdbc.mutation.internal.MutationExecutorSingleBatched;
1318
import org.hibernate.engine.spi.SharedSessionContractImplementor;
19+
import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor;
20+
import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor;
21+
import org.hibernate.reactive.engine.jdbc.ResultsCheckerUtil;
1422
import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
23+
import org.hibernate.reactive.pool.ReactiveConnection;
24+
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
1525
import org.hibernate.sql.model.PreparableMutationOperation;
26+
import org.hibernate.sql.model.TableMapping;
1627
import org.hibernate.sql.model.ValuesAnalysis;
1728

18-
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
19-
2029
public class ReactiveMutationExecutorSingleBatched extends MutationExecutorSingleBatched implements
2130
ReactiveMutationExecutor {
2231

@@ -31,8 +40,41 @@ public ReactiveMutationExecutorSingleBatched(
3140
@Override
3241
public CompletionStage<Void> performReactiveBatchedOperations(
3342
ValuesAnalysis valuesAnalysis,
34-
TableInclusionChecker inclusionChecker) {
35-
super.performBatchedOperations( valuesAnalysis, inclusionChecker );
36-
return voidFuture();
43+
TableInclusionChecker inclusionChecker,
44+
OperationResultChecker resultChecker,
45+
SharedSessionContractImplementor session) {
46+
47+
PreparedStatementDetails statementDetails = getStatementGroup().getSingleStatementDetails();
48+
JdbcValueBindings valueBindings = getJdbcValueBindings();
49+
50+
if ( statementDetails == null ) {
51+
return voidFuture();
52+
}
53+
54+
final TableMapping tableDetails = statementDetails.getMutatingTableDetails();
55+
if ( inclusionChecker != null && !inclusionChecker.include( tableDetails ) ) {
56+
if ( MODEL_MUTATION_LOGGER_TRACE_ENABLED ) {
57+
MODEL_MUTATION_LOGGER.tracef( "Skipping execution of secondary insert : %s", tableDetails.getTableName() );
58+
}
59+
return voidFuture();
60+
}
61+
62+
// If we get here the statement is needed - make sure it is resolved
63+
Object[] paramValues = PreparedStatementAdaptor.bind(statement -> {
64+
PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( statementDetails, statement, session.getJdbcServices() );
65+
valueBindings.beforeStatement( details );
66+
} );
67+
68+
ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
69+
String sql = statementDetails.getSqlString();
70+
return reactiveConnection
71+
.update(sql, paramValues, true,
72+
(rowCount, batchPosition, query) -> ResultsCheckerUtil.checkResults(session, statementDetails, resultChecker, rowCount, batchPosition))
73+
.whenComplete( (o, throwable) -> { //TODO: is this part really needed?
74+
if ( statementDetails.getStatement() != null ) {
75+
statementDetails.releaseStatement( session );
76+
}
77+
valueBindings.afterStatement( tableDetails );
78+
} );
3779
}
3880
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/mutation/internal/ReactiveMutationExecutorStandard.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ private ReactiveConnection connection(SharedSessionContractImplementor session)
4949
@Override
5050
public CompletionStage<Void> performReactiveBatchedOperations(
5151
ValuesAnalysis valuesAnalysis,
52-
TableInclusionChecker inclusionChecker) {
53-
return ReactiveMutationExecutor.super.performReactiveBatchedOperations( valuesAnalysis, inclusionChecker );
52+
TableInclusionChecker inclusionChecker, OperationResultChecker resultChecker,
53+
SharedSessionContractImplementor session) {
54+
return ReactiveMutationExecutor.super.performReactiveBatchedOperations( valuesAnalysis, inclusionChecker,
55+
resultChecker, session);
5456
}
5557

5658
@Override

hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void testBatchingWithPersistAll(TestContext context) {
7171
assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 );
7272
// Parameters are different for different dbs, so we cannot do an exact match
7373
assertThat( sqlTracker.getLoggedQueries().get( 0 ) )
74-
.startsWith( "insert into pig (name, version, id) values " );
74+
.startsWith( "insert into pig (name,version,id) values " );
7575
sqlTracker.clear();
7676
} )
7777
)
@@ -96,7 +96,7 @@ public void testBatching(TestContext context) {
9696
assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 );
9797
// Parameters are different for different dbs, so we cannot do an exact match
9898
assertThat( sqlTracker.getLoggedQueries().get( 0 ) )
99-
.startsWith( "insert into pig (name, version, id) values " );
99+
.matches("insert into pig \\(name,version,id\\) values (.*)" );
100100
sqlTracker.clear();
101101
} )
102102
)
@@ -111,7 +111,9 @@ public void testBatching(TestContext context) {
111111
context.assertEquals( 3L, count);
112112
assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 );
113113
assertThat( sqlTracker.getLoggedQueries().get( 0 ) )
114-
.matches( "update pig set name=.+, version=.+ where id=.+ and version=.+" );
114+
.matches(
115+
"update pig set name=.+,\\s*version=.+ where id=.+ and "
116+
+ "version=.+" );
115117
sqlTracker.clear();
116118
} )
117119
) )

0 commit comments

Comments
 (0)