Skip to content

Commit 57d9b92

Browse files
committed
Upgrade Reactor to Dysprosium RC1
Closes: gh-23579
1 parent f26866e commit 57d9b92

File tree

3 files changed

+42
-22
lines changed

3 files changed

+42
-22
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ configure(allprojects) { project ->
3232
imports {
3333
mavenBom "com.fasterxml.jackson:jackson-bom:2.9.9"
3434
mavenBom "io.netty:netty-bom:4.1.39.Final"
35-
mavenBom "io.projectreactor:reactor-bom:Dysprosium-M3"
35+
mavenBom "io.projectreactor:reactor-bom:Dysprosium-RC1"
3636
mavenBom "org.eclipse.jetty:jetty-bom:9.4.20.v20190813"
3737
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.50"
3838
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.0"

spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -854,15 +854,22 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
854854
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> {
855855
try {
856856
// Need re-wrapping until we get hold of the exception through usingWhen.
857-
return Mono.<Object, ReactiveTransactionInfo>usingWhen(Mono.just(it), txInfo -> {
858-
try {
859-
return (Mono<?>) invocation.proceedWithInvocation();
860-
}
861-
catch (Throwable ex) {
862-
return Mono.error(ex);
863-
}
864-
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty())
865-
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
857+
return Mono
858+
.<Object, ReactiveTransactionInfo>usingWhen(
859+
Mono.just(it),
860+
txInfo -> {
861+
try {
862+
return (Mono<?>) invocation.proceedWithInvocation();
863+
}
864+
catch (Throwable ex) {
865+
return Mono.error(ex);
866+
}
867+
},
868+
this::commitTransactionAfterReturning,
869+
(txInfo, err) -> Mono.empty(),
870+
this::commitTransactionAfterReturning)
871+
.onErrorResume(ex ->
872+
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
866873
}
867874
catch (Throwable ex) {
868875
// target invocation exception
@@ -877,15 +884,22 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
877884
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> {
878885
try {
879886
// Need re-wrapping until we get hold of the exception through usingWhen.
880-
return Flux.usingWhen(Mono.just(it), txInfo -> {
881-
try {
882-
return this.adapter.toPublisher(invocation.proceedWithInvocation());
883-
}
884-
catch (Throwable ex) {
885-
return Mono.error(ex);
886-
}
887-
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty())
888-
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
887+
return Flux
888+
.usingWhen(
889+
Mono.just(it),
890+
txInfo -> {
891+
try {
892+
return this.adapter.toPublisher(invocation.proceedWithInvocation());
893+
}
894+
catch (Throwable ex) {
895+
return Mono.error(ex);
896+
}
897+
},
898+
this::commitTransactionAfterReturning,
899+
(txInfo, ex) -> Mono.empty(),
900+
this::commitTransactionAfterReturning)
901+
.onErrorResume(ex ->
902+
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
889903
}
890904
catch (Throwable ex) {
891905
// target invocation exception

spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,15 @@ public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionExce
8080
// This will normally result in a target object being invoked.
8181
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
8282
// through usingWhen.
83-
return status.flatMapMany(it -> Flux.usingWhen(Mono.just(it), action::doInTransaction,
84-
this.transactionManager::commit, s -> Mono.empty())
85-
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
83+
return status.flatMapMany(it -> Flux
84+
.usingWhen(
85+
Mono.just(it),
86+
action::doInTransaction,
87+
this.transactionManager::commit,
88+
(tx, ex) -> Mono.empty(),
89+
this.transactionManager::commit)
90+
.onErrorResume(ex ->
91+
rollbackOnException(it, ex).then(Mono.error(ex))));
8692
})
8793
.subscriberContext(TransactionContextManager.getOrCreateContext())
8894
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());

0 commit comments

Comments
 (0)