14
14
import org .hibernate .engine .jdbc .mutation .OperationResultChecker ;
15
15
import org .hibernate .engine .jdbc .mutation .TableInclusionChecker ;
16
16
import org .hibernate .engine .jdbc .mutation .group .PreparedStatementDetails ;
17
+ import org .hibernate .engine .jdbc .mutation .group .PreparedStatementGroup ;
17
18
import org .hibernate .engine .jdbc .mutation .internal .MutationExecutorStandard ;
18
19
import org .hibernate .engine .spi .SharedSessionContractImplementor ;
20
+ import org .hibernate .reactive .adaptor .impl .PrepareStatementDetailsAdaptor ;
21
+ import org .hibernate .reactive .adaptor .impl .PreparedStatementAdaptor ;
19
22
import org .hibernate .reactive .engine .jdbc .env .internal .ReactiveMutationExecutor ;
20
23
import org .hibernate .reactive .pool .ReactiveConnection ;
21
24
import org .hibernate .reactive .session .ReactiveConnectionSupplier ;
@@ -51,8 +54,8 @@ public CompletionStage<Void> performReactiveBatchedOperations(
51
54
ValuesAnalysis valuesAnalysis ,
52
55
TableInclusionChecker inclusionChecker , OperationResultChecker resultChecker ,
53
56
SharedSessionContractImplementor session ) {
54
- return ReactiveMutationExecutor .super . performReactiveBatchedOperations ( valuesAnalysis , inclusionChecker ,
55
- resultChecker , session );
57
+ return ReactiveMutationExecutor .super
58
+ . performReactiveBatchedOperations ( valuesAnalysis , inclusionChecker , resultChecker , session );
56
59
}
57
60
58
61
@ Override
@@ -79,6 +82,57 @@ protected void performBatchedOperations(
79
82
throw LOG .nonReactiveMethodCall ( "performReactiveBatchedOperations" );
80
83
}
81
84
85
+ @ Override
86
+ public CompletionStage <Void > performReactiveNonBatchedOperations (
87
+ ValuesAnalysis valuesAnalysis ,
88
+ TableInclusionChecker inclusionChecker ,
89
+ OperationResultChecker resultChecker ,
90
+ SharedSessionContractImplementor session ) {
91
+
92
+ if ( getNonBatchedStatementGroup () == null || getNonBatchedStatementGroup ().getNumberOfStatements () <= 0 ) {
93
+ return voidFuture ();
94
+ }
95
+
96
+ PreparedStatementGroup nonBatchedStatementGroup = getNonBatchedStatementGroup ();
97
+ final OperationsForEach forEach = new OperationsForEach ( inclusionChecker , resultChecker , session , getJdbcValueBindings () );
98
+ nonBatchedStatementGroup .forEachStatement ( forEach ::add );
99
+ return forEach .buildLoop ();
100
+ }
101
+
102
+ private class OperationsForEach {
103
+
104
+ private final TableInclusionChecker inclusionChecker ;
105
+ private final OperationResultChecker resultChecker ;
106
+ private final SharedSessionContractImplementor session ;
107
+ private final JdbcValueBindings jdbcValueBindings ;
108
+
109
+ private CompletionStage <Void > loop = voidFuture ();
110
+
111
+ public OperationsForEach (
112
+ TableInclusionChecker inclusionChecker ,
113
+ OperationResultChecker resultChecker ,
114
+ SharedSessionContractImplementor session ,
115
+ JdbcValueBindings jdbcValueBindings ) {
116
+ this .inclusionChecker = inclusionChecker ;
117
+ this .resultChecker = resultChecker ;
118
+ this .session = session ;
119
+ this .jdbcValueBindings = jdbcValueBindings ;
120
+ }
121
+
122
+ public void add (String tableName , PreparedStatementDetails statementDetails ) {
123
+ loop = loop .thenCompose ( v -> performReactiveNonBatchedMutation (
124
+ statementDetails ,
125
+ getJdbcValueBindings (),
126
+ inclusionChecker ,
127
+ resultChecker ,
128
+ session
129
+ ) );
130
+ }
131
+
132
+ public CompletionStage <Void > buildLoop () {
133
+ return loop ;
134
+ }
135
+ }
82
136
@ Override
83
137
public CompletionStage <Void > performReactiveNonBatchedMutation (
84
138
PreparedStatementDetails statementDetails ,
@@ -99,12 +153,13 @@ public CompletionStage<Void> performReactiveNonBatchedMutation(
99
153
return voidFuture ();
100
154
}
101
155
102
- // If we get here the statement is needed - make sure it is resolved
103
- session .getJdbcServices ().getSqlStatementLogger ().logStatement ( statementDetails .getSqlString () );
104
- valueBindings .beforeStatement ( statementDetails );
156
+ Object [] params = PreparedStatementAdaptor .bind ( statement -> {
157
+ PreparedStatementDetails details = new PrepareStatementDetailsAdaptor ( statementDetails , statement , session .getJdbcServices () );
158
+ valueBindings .beforeStatement ( details );
159
+ } );
105
160
106
161
return connection ( session )
107
- .update ( statementDetails .getSqlString () )
162
+ .update ( statementDetails .getSqlString (), params )
108
163
.thenCompose ( affectedRowCount -> checkResult ( session , statementDetails , resultChecker , tableDetails , affectedRowCount ) )
109
164
.whenComplete ( (unused , throwable ) -> {
110
165
if ( statementDetails .getStatement () != null ) {
0 commit comments