|
5 | 5 | */ |
6 | 6 | package org.hibernate.reactive.engine.jdbc.mutation.internal; |
7 | 7 |
|
8 | | -import java.lang.invoke.MethodHandles; |
9 | | -import java.sql.SQLException; |
10 | | -import java.util.concurrent.CompletionStage; |
11 | | - |
12 | 8 | import org.hibernate.engine.jdbc.batch.spi.Batch; |
13 | 9 | import org.hibernate.engine.jdbc.mutation.JdbcValueBindings; |
14 | 10 | import org.hibernate.engine.jdbc.mutation.OperationResultChecker; |
|
25 | 21 | import org.hibernate.persister.entity.mutation.EntityTableMapping; |
26 | 22 | import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor; |
27 | 23 | import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor; |
| 24 | +import org.hibernate.reactive.engine.jdbc.ResultsCheckerUtil; |
28 | 25 | import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor; |
29 | 26 | import org.hibernate.reactive.generator.values.ReactiveGeneratedValuesMutationDelegate; |
30 | 27 | import org.hibernate.reactive.logging.impl.Log; |
|
37 | 34 | import org.hibernate.sql.model.TableMapping; |
38 | 35 | import org.hibernate.sql.model.ValuesAnalysis; |
39 | 36 |
|
| 37 | +import java.lang.invoke.MethodHandles; |
| 38 | +import java.sql.SQLException; |
| 39 | +import java.util.ArrayList; |
| 40 | +import java.util.List; |
| 41 | +import java.util.concurrent.CompletionStage; |
| 42 | + |
40 | 43 | import static org.hibernate.engine.jdbc.mutation.internal.ModelMutationHelper.checkResults; |
41 | 44 | import static org.hibernate.reactive.logging.impl.LoggerFactory.make; |
42 | 45 | import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; |
| 46 | +import static org.hibernate.reactive.util.impl.CompletionStages.loop; |
43 | 47 | import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; |
44 | 48 | import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; |
45 | 49 | import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER; |
@@ -73,10 +77,64 @@ private ReactiveConnection connection(SharedSessionContractImplementor session) |
73 | 77 | @Override |
74 | 78 | public CompletionStage<Void> performReactiveBatchedOperations( |
75 | 79 | ValuesAnalysis valuesAnalysis, |
76 | | - TableInclusionChecker inclusionChecker, OperationResultChecker resultChecker, |
| 80 | + TableInclusionChecker inclusionChecker, |
| 81 | + OperationResultChecker resultChecker, |
77 | 82 | SharedSessionContractImplementor session) { |
78 | | - return ReactiveMutationExecutor.super |
79 | | - .performReactiveBatchedOperations( valuesAnalysis, inclusionChecker, resultChecker, session); |
| 83 | + final PreparedStatementGroup batchedMutationOperationGroup = getBatchedPreparedStatementGroup(); |
| 84 | + if ( batchedMutationOperationGroup != null ) { |
| 85 | + final List<PreparedStatementDetails> preparedStatementDetailsList = new ArrayList<>( |
| 86 | + batchedMutationOperationGroup.getNumberOfStatements() ); |
| 87 | + batchedMutationOperationGroup.forEachStatement( (tableName, statementDetails) -> preparedStatementDetailsList |
| 88 | + .add( statementDetails ) ); |
| 89 | + loop( preparedStatementDetailsList, statementDetails -> { |
| 90 | + if ( statementDetails == null ) { |
| 91 | + return voidFuture(); |
| 92 | + } |
| 93 | + final JdbcValueBindings valueBindings = getJdbcValueBindings(); |
| 94 | + final TableMapping tableDetails = statementDetails.getMutatingTableDetails(); |
| 95 | + if ( inclusionChecker != null && !inclusionChecker.include( tableDetails ) ) { |
| 96 | + if ( MODEL_MUTATION_LOGGER.isTraceEnabled() ) { |
| 97 | + MODEL_MUTATION_LOGGER.tracef( |
| 98 | + "Skipping execution of secondary insert : %s", |
| 99 | + tableDetails.getTableName() |
| 100 | + ); |
| 101 | + } |
| 102 | + return voidFuture(); |
| 103 | + } |
| 104 | + |
| 105 | + // If we get here the statement is needed - make sure it is resolved |
| 106 | + final Object[] paramValues = PreparedStatementAdaptor.bind( statement -> { |
| 107 | + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( |
| 108 | + statementDetails, |
| 109 | + statement, |
| 110 | + session.getJdbcServices() |
| 111 | + ); |
| 112 | + valueBindings.beforeStatement( details ); |
| 113 | + } ); |
| 114 | + |
| 115 | + final ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); |
| 116 | + final String sql = statementDetails.getSqlString(); |
| 117 | + return reactiveConnection.update( |
| 118 | + sql, |
| 119 | + paramValues, |
| 120 | + true, |
| 121 | + (rowCount, batchPosition, query) -> ResultsCheckerUtil.checkResults( |
| 122 | + session, |
| 123 | + statementDetails, |
| 124 | + resultChecker, |
| 125 | + rowCount, |
| 126 | + batchPosition |
| 127 | + ) |
| 128 | + ).whenComplete( (o, throwable) -> { //TODO: is this part really needed? |
| 129 | + if ( statementDetails.getStatement() != null ) { |
| 130 | + statementDetails.releaseStatement( session ); |
| 131 | + } |
| 132 | + valueBindings.afterStatement( tableDetails ); |
| 133 | + } ); |
| 134 | + } |
| 135 | + ); |
| 136 | + } |
| 137 | + return voidFuture(); |
80 | 138 | } |
81 | 139 |
|
82 | 140 | @Override |
@@ -159,6 +217,22 @@ public CompletionStage<GeneratedValues> performReactiveNonBatchedOperations( |
159 | 217 | } |
160 | 218 | } |
161 | 219 |
|
| 220 | + public CompletionStage<Void> performReactiveSelfExecutingOperations( |
| 221 | + ValuesAnalysis valuesAnalysis, |
| 222 | + TableInclusionChecker inclusionChecker, |
| 223 | + SharedSessionContractImplementor session) { |
| 224 | + if ( getSelfExecutingMutations() == null || getSelfExecutingMutations().isEmpty() ) { |
| 225 | + return voidFuture(); |
| 226 | + } |
| 227 | + |
| 228 | + return loop( getSelfExecutingMutations(), operation -> { |
| 229 | + if ( inclusionChecker.include( operation.getTableDetails() ) ) { |
| 230 | + operation.performMutation( getJdbcValueBindings(), valuesAnalysis, session ); |
| 231 | + } |
| 232 | + return voidFuture(); |
| 233 | + }); |
| 234 | + } |
| 235 | + |
162 | 236 | private class OperationsForEach { |
163 | 237 |
|
164 | 238 | private final Object id; |
@@ -210,6 +284,7 @@ public CompletionStage<Void> buildLoop() { |
210 | 284 | return loop; |
211 | 285 | } |
212 | 286 | } |
| 287 | + |
213 | 288 | @Override |
214 | 289 | public CompletionStage<Void> performReactiveNonBatchedMutation( |
215 | 290 | PreparedStatementDetails statementDetails, |
|
0 commit comments