Skip to content

Commit 847e8a2

Browse files
committed
Restore auto-commit mode if not done by driver
Closes gh-31268
1 parent 9aab4a6 commit 847e8a2

File tree

2 files changed

+53
-34
lines changed

2 files changed

+53
-34
lines changed

spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
209209
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
210210
}
211211

212-
return connectionMono.flatMap(con -> doBegin(definition, con)
212+
return connectionMono.flatMap(con -> doBegin(con, txObject, definition)
213213
.then(prepareTransactionalConnection(con, definition))
214214
.doOnSuccess(v -> {
215215
txObject.getConnectionHolder().setTransactionActive(true);
@@ -233,7 +233,10 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
233233
}).then();
234234
}
235235

236-
private Mono<Void> doBegin(TransactionDefinition definition, Connection con) {
236+
private Mono<Void> doBegin(
237+
Connection con, ConnectionFactoryTransactionObject transaction, TransactionDefinition definition) {
238+
239+
transaction.setMustRestoreAutoCommit(con.isAutoCommit());
237240
io.r2dbc.spi.TransactionDefinition transactionDefinition = createTransactionDefinition(definition);
238241
if (logger.isDebugEnabled()) {
239242
logger.debug("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]");
@@ -354,12 +357,22 @@ protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager
354357
if (logger.isDebugEnabled()) {
355358
logger.debug("Releasing R2DBC Connection [" + con + "] after transaction");
356359
}
360+
Mono<Void> restoreMono = Mono.empty();
361+
if (txObject.isMustRestoreAutoCommit() && !con.isAutoCommit()) {
362+
restoreMono = Mono.from(con.setAutoCommit(true));
363+
if (logger.isDebugEnabled()) {
364+
restoreMono = restoreMono.doOnError(ex ->
365+
logger.debug(String.format("Error ignored during auto-commit restore: %s", ex)));
366+
}
367+
restoreMono = restoreMono.onErrorComplete();
368+
}
357369
Mono<Void> releaseMono = ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory());
358370
if (logger.isDebugEnabled()) {
359-
releaseMono = releaseMono.doOnError(
360-
ex -> logger.debug(String.format("Error ignored during cleanup: %s", ex)));
371+
releaseMono = releaseMono.doOnError(ex ->
372+
logger.debug(String.format("Error ignored during connection release: %s", ex)));
361373
}
362-
return releaseMono.onErrorComplete();
374+
releaseMono = releaseMono.onErrorComplete();
375+
return restoreMono.then(releaseMono);
363376
}
364377
}
365378
finally {
@@ -482,6 +495,8 @@ private static class ConnectionFactoryTransactionObject {
482495

483496
private boolean newConnectionHolder;
484497

498+
private boolean mustRestoreAutoCommit;
499+
485500
@Nullable
486501
private String savepointName;
487502

@@ -507,6 +522,14 @@ public boolean hasConnectionHolder() {
507522
return (this.connectionHolder != null);
508523
}
509524

525+
public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
526+
this.mustRestoreAutoCommit = mustRestoreAutoCommit;
527+
}
528+
529+
public boolean isMustRestoreAutoCommit() {
530+
return this.mustRestoreAutoCommit;
531+
}
532+
510533
public boolean isTransactionActive() {
511534
return (this.connectionHolder != null && this.connectionHolder.isTransactionActive());
512535
}

spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ void before() {
8181

8282
@Test
8383
void testSimpleTransaction() {
84-
TestTransactionSynchronization sync = new TestTransactionSynchronization(
85-
TransactionSynchronization.STATUS_COMMITTED);
84+
when(connectionMock.isAutoCommit()).thenReturn(false);
8685
AtomicInteger commits = new AtomicInteger();
8786
when(connectionMock.commitTransaction()).thenReturn(
8887
Mono.fromRunnable(commits::incrementAndGet));
88+
TestTransactionSynchronization sync = new TestTransactionSynchronization(
89+
TransactionSynchronization.STATUS_COMMITTED);
8990

9091
TransactionalOperator operator = TransactionalOperator.create(tm);
9192

@@ -98,6 +99,7 @@ void testSimpleTransaction() {
9899
.verifyComplete();
99100

100101
assertThat(commits).hasValue(1);
102+
verify(connectionMock).isAutoCommit();
101103
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
102104
verify(connectionMock).commitTransaction();
103105
verify(connectionMock).close();
@@ -131,8 +133,10 @@ void testBeginFails() {
131133
}
132134

133135
@Test
134-
void appliesTransactionDefinition() {
136+
void appliesTransactionDefinitionAndAutoCommit() {
137+
when(connectionMock.isAutoCommit()).thenReturn(true, false);
135138
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
139+
when(connectionMock.setAutoCommit(true)).thenReturn(Mono.empty());
136140

137141
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
138142
definition.setName("my-transaction");
@@ -152,6 +156,7 @@ void appliesTransactionDefinition() {
152156
verify(connectionMock).beginTransaction(txCaptor.capture());
153157
verify(connectionMock, never()).setTransactionIsolationLevel(any());
154158
verify(connectionMock).commitTransaction();
159+
verify(connectionMock).setAutoCommit(true);
155160
verify(connectionMock).close();
156161

157162
io.r2dbc.spi.TransactionDefinition def = txCaptor.getValue();
@@ -162,29 +167,8 @@ void appliesTransactionDefinition() {
162167
}
163168

164169
@Test
165-
void doesNotSetIsolationLevelIfMatch() {
166-
when(connectionMock.getTransactionIsolationLevel()).thenReturn(
167-
IsolationLevel.READ_COMMITTED);
168-
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
169-
170-
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
171-
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
172-
173-
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
174-
175-
ConnectionFactoryUtils.getConnection(connectionFactoryMock)
176-
.as(operator::transactional)
177-
.as(StepVerifier::create)
178-
.expectNextCount(1)
179-
.verifyComplete();
180-
181-
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
182-
verify(connectionMock, never()).setTransactionIsolationLevel(any());
183-
verify(connectionMock).commitTransaction();
184-
}
185-
186-
@Test
187-
void doesNotSetAutoCommitDisabled() {
170+
void doesNotSetAutoCommitIfRestoredByDriver() {
171+
when(connectionMock.isAutoCommit()).thenReturn(true, true);
188172
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
189173

190174
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
@@ -204,6 +188,7 @@ void doesNotSetAutoCommitDisabled() {
204188

205189
@Test
206190
void appliesReadOnly() {
191+
when(connectionMock.isAutoCommit()).thenReturn(false);
207192
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
208193
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
209194
Statement statement = mock();
@@ -222,6 +207,7 @@ void appliesReadOnly() {
222207
.expectNextCount(1)
223208
.verifyComplete();
224209

210+
verify(connectionMock).isAutoCommit();
225211
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
226212
verify(connectionMock).createStatement("SET TRANSACTION READ ONLY");
227213
verify(connectionMock).commitTransaction();
@@ -231,7 +217,9 @@ void appliesReadOnly() {
231217

232218
@Test
233219
void testCommitFails() {
234-
when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))));
220+
when(connectionMock.isAutoCommit()).thenReturn(false);
221+
when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() ->
222+
Mono.error(new R2dbcBadGrammarException("Commit should fail"))));
235223
when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
236224

237225
TransactionalOperator operator = TransactionalOperator.create(tm);
@@ -242,6 +230,7 @@ void testCommitFails() {
242230
.as(StepVerifier::create)
243231
.verifyError(BadSqlGrammarException.class);
244232

233+
verify(connectionMock).isAutoCommit();
245234
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
246235
verify(connectionMock).createStatement("foo");
247236
verify(connectionMock).commitTransaction();
@@ -252,6 +241,7 @@ void testCommitFails() {
252241

253242
@Test
254243
void testRollback() {
244+
when(connectionMock.isAutoCommit()).thenReturn(false);
255245
AtomicInteger commits = new AtomicInteger();
256246
when(connectionMock.commitTransaction()).thenReturn(
257247
Mono.fromRunnable(commits::incrementAndGet));
@@ -269,6 +259,7 @@ void testRollback() {
269259

270260
assertThat(commits).hasValue(0);
271261
assertThat(rollbacks).hasValue(1);
262+
verify(connectionMock).isAutoCommit();
272263
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
273264
verify(connectionMock).rollbackTransaction();
274265
verify(connectionMock).close();
@@ -278,7 +269,8 @@ void testRollback() {
278269
@Test
279270
@SuppressWarnings("unchecked")
280271
void testRollbackFails() {
281-
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty());
272+
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() ->
273+
Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty());
282274

283275
TransactionalOperator operator = TransactionalOperator.create(tm);
284276
operator.execute(reactiveTransaction -> {
@@ -287,6 +279,7 @@ void testRollbackFails() {
287279
.doOnNext(connection -> connection.createStatement("foo")).then();
288280
}).as(StepVerifier::create).verifyError(BadSqlGrammarException.class);
289281

282+
verify(connectionMock).isAutoCommit();
290283
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
291284
verify(connectionMock).createStatement("foo");
292285
verify(connectionMock, never()).commitTransaction();
@@ -298,7 +291,8 @@ void testRollbackFails() {
298291
@Test
299292
@SuppressWarnings("unchecked")
300293
void testConnectionReleasedWhenRollbackFails() {
301-
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty());
294+
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() ->
295+
Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty());
302296
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
303297

304298
TransactionalOperator operator = TransactionalOperator.create(tm);
@@ -319,6 +313,7 @@ void testConnectionReleasedWhenRollbackFails() {
319313

320314
@Test
321315
void testTransactionSetRollbackOnly() {
316+
when(connectionMock.isAutoCommit()).thenReturn(false);
322317
when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
323318
TestTransactionSynchronization sync = new TestTransactionSynchronization(
324319
TransactionSynchronization.STATUS_ROLLED_BACK);
@@ -334,6 +329,7 @@ void testTransactionSetRollbackOnly() {
334329
}).then();
335330
}).as(StepVerifier::create).verifyComplete();
336331

332+
verify(connectionMock).isAutoCommit();
337333
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
338334
verify(connectionMock).rollbackTransaction();
339335
verify(connectionMock).close();

0 commit comments

Comments
 (0)