Skip to content

Commit ae70e3c

Browse files
committed
Apply read-only enforcement after R2DBC transaction begin
Includes prepareTransactionalConnection variant aligned with JDBC DataSourceTransactionManager. Closes gh-28610
1 parent c942c8d commit ae70e3c

File tree

1 file changed

+53
-43
lines changed

1 file changed

+53
-43
lines changed

spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,7 +47,7 @@
4747
* <p><b>Note: The {@code ConnectionFactory} that this transaction manager
4848
* operates on needs to return independent {@code Connection}s.</b>
4949
* The {@code Connection}s may come from a pool (the typical case), but the
50-
* {@code ConnectionFactory} must not return scoped scoped {@code Connection}s
50+
* {@code ConnectionFactory} must not return scoped {@code Connection}s
5151
* or the like. This transaction manager will associate {@code Connection}
5252
* with context-bound transactions itself, according to the specified propagation
5353
* behavior. It assumes that a separate, independent {@code Connection} can
@@ -142,8 +142,8 @@ protected ConnectionFactory obtainConnectionFactory() {
142142
* transactional connection: "SET TRANSACTION READ ONLY" as understood by Oracle,
143143
* MySQL and Postgres.
144144
* <p>The exact treatment, including any SQL statement executed on the connection,
145-
* can be customized through through {@link #prepareTransactionalConnection}.
146-
* @see #prepareTransactionalConnection
145+
* can be customized through {@link #prepareTransactionalConnection(Connection, TransactionDefinition)}.
146+
* @see #prepareTransactionalConnection(Connection, TransactionDefinition)
147147
*/
148148
public void setEnforceReadOnly(boolean enforceReadOnly) {
149149
this.enforceReadOnly = enforceReadOnly;
@@ -179,6 +179,7 @@ protected boolean isExistingTransaction(Object transaction) {
179179
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
180180
}
181181

182+
@SuppressWarnings("deprecation")
182183
@Override
183184
protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction,
184185
TransactionDefinition definition) throws TransactionException {
@@ -202,27 +203,27 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
202203
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
203204
}
204205

205-
return connectionMono.flatMap(con -> {
206-
return prepareTransactionalConnection(con, definition, transaction).then(Mono.from(con.beginTransaction()))
207-
.doOnSuccess(v -> {
208-
txObject.getConnectionHolder().setTransactionActive(true);
209-
Duration timeout = determineTimeout(definition);
210-
if (!timeout.isNegative() && !timeout.isZero()) {
211-
txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
212-
}
213-
// Bind the connection holder to the thread.
214-
if (txObject.isNewConnectionHolder()) {
215-
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
216-
}
217-
}).thenReturn(con).onErrorResume(e -> {
218-
if (txObject.isNewConnectionHolder()) {
219-
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
220-
.doOnTerminate(() -> txObject.setConnectionHolder(null, false))
221-
.then(Mono.error(e));
222-
}
223-
return Mono.error(e);
224-
});
225-
}).onErrorResume(e -> {
206+
return connectionMono.flatMap(con -> prepareTransactionalConnection(con, definition, transaction)
207+
.then(Mono.from(con.beginTransaction()))
208+
.then(prepareTransactionalConnection(con, definition))
209+
.doOnSuccess(v -> {
210+
txObject.getConnectionHolder().setTransactionActive(true);
211+
Duration timeout = determineTimeout(definition);
212+
if (!timeout.isNegative() && !timeout.isZero()) {
213+
txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
214+
}
215+
// Bind the connection holder to the thread.
216+
if (txObject.isNewConnectionHolder()) {
217+
synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
218+
}
219+
}).thenReturn(con).onErrorResume(e -> {
220+
if (txObject.isNewConnectionHolder()) {
221+
return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
222+
.doOnTerminate(() -> txObject.setConnectionHolder(null, false))
223+
.then(Mono.error(e));
224+
}
225+
return Mono.error(e);
226+
})).onErrorResume(e -> {
226227
CannotCreateTransactionException ex = new CannotCreateTransactionException(
227228
"Could not open R2DBC Connection for transaction", e);
228229
return Mono.error(ex);
@@ -350,31 +351,17 @@ protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager
350351
}
351352

352353
/**
353-
* Prepare the transactional {@link Connection} right after transaction begin.
354-
* <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
355-
* {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the
356-
* transaction definition indicates a read-only transaction.
357-
* <p>The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres
358-
* and may work with other databases as well. If you'd like to adapt this treatment,
359-
* override this method accordingly.
360-
* @param con the transactional R2DBC Connection
361-
* @param definition the current transaction definition
362-
* @param transaction the transaction object
363-
* @see #setEnforceReadOnly
354+
* Prepare the transactional {@link Connection} right before transaction begin.
355+
* @deprecated in favor of {@link #prepareTransactionalConnection(Connection, TransactionDefinition)}
356+
* since this variant gets called too early (before transaction begin) for read-only customization
364357
*/
358+
@Deprecated
365359
protected Mono<Void> prepareTransactionalConnection(
366360
Connection con, TransactionDefinition definition, Object transaction) {
367361

368362
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
369-
370363
Mono<Void> prepare = Mono.empty();
371364

372-
if (isEnforceReadOnly() && definition.isReadOnly()) {
373-
prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute())
374-
.flatMapMany(Result::getRowsUpdated)
375-
.then();
376-
}
377-
378365
// Apply specific isolation level, if any.
379366
IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel());
380367
if (isolationLevelToUse != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
@@ -404,6 +391,29 @@ protected Mono<Void> prepareTransactionalConnection(
404391
return prepare;
405392
}
406393

394+
/**
395+
* Prepare the transactional {@link Connection} right after transaction begin.
396+
* <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
397+
* {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the
398+
* transaction definition indicates a read-only transaction.
399+
* <p>The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres
400+
* and may work with other databases as well. If you'd like to adapt this treatment,
401+
* override this method accordingly.
402+
* @param con the transactional R2DBC Connection
403+
* @param definition the current transaction definition
404+
* @since 5.3.22
405+
* @see #setEnforceReadOnly
406+
*/
407+
protected Mono<Void> prepareTransactionalConnection(Connection con, TransactionDefinition definition) {
408+
Mono<Void> prepare = Mono.empty();
409+
if (isEnforceReadOnly() && definition.isReadOnly()) {
410+
prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute())
411+
.flatMapMany(Result::getRowsUpdated)
412+
.then();
413+
}
414+
return prepare;
415+
}
416+
407417
/**
408418
* Resolve the {@linkplain TransactionDefinition#getIsolationLevel() isolation level constant} to a R2DBC
409419
* {@link IsolationLevel}. If you'd like to extend isolation level translation for vendor-specific

0 commit comments

Comments
 (0)