1
1
/*
2
- * Copyright 2002-2019 the original author or authors.
2
+ * Copyright 2002-2020 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -196,7 +196,8 @@ private Mono<ReactiveTransaction> handleExistingTransaction(TransactionSynchroni
196
196
return doBegin (synchronizationManager , transaction , definition ).doOnSuccess (ignore ->
197
197
prepareSynchronization (synchronizationManager , status , definition )).thenReturn (status )
198
198
.onErrorResume (ErrorPredicates .RUNTIME_OR_ERROR , beginEx ->
199
- resumeAfterBeginException (synchronizationManager , transaction , suspendedResourcesHolder , beginEx ).then (Mono .error (beginEx )));
199
+ resumeAfterBeginException (synchronizationManager , transaction , suspendedResourcesHolder , beginEx )
200
+ .then (Mono .error (beginEx )));
200
201
});
201
202
}
202
203
@@ -281,7 +282,9 @@ private Mono<SuspendedResourcesHolder> suspend(TransactionSynchronizationManager
281
282
if (synchronizationManager .isSynchronizationActive ()) {
282
283
Mono <List <TransactionSynchronization >> suspendedSynchronizations = doSuspendSynchronization (synchronizationManager );
283
284
return suspendedSynchronizations .flatMap (synchronizations -> {
284
- Mono <Optional <Object >> suspendedResources = (transaction != null ? doSuspend (synchronizationManager , transaction ).map (Optional ::of ).defaultIfEmpty (Optional .empty ()) : Mono .just (Optional .empty ()));
285
+ Mono <Optional <Object >> suspendedResources = (transaction != null ?
286
+ doSuspend (synchronizationManager , transaction ).map (Optional ::of ).defaultIfEmpty (Optional .empty ()) :
287
+ Mono .just (Optional .empty ()));
285
288
return suspendedResources .map (it -> {
286
289
String name = synchronizationManager .getCurrentTransactionName ();
287
290
synchronizationManager .setCurrentTransactionName (null );
@@ -293,12 +296,15 @@ private Mono<SuspendedResourcesHolder> suspend(TransactionSynchronizationManager
293
296
synchronizationManager .setActualTransactionActive (false );
294
297
return new SuspendedResourcesHolder (
295
298
it .orElse (null ), synchronizations , name , readOnly , isolationLevel , wasActive );
296
- }).onErrorResume (ErrorPredicates .RUNTIME_OR_ERROR , ex -> doResumeSynchronization (synchronizationManager , synchronizations ).cast (SuspendedResourcesHolder .class ));
299
+ }).onErrorResume (ErrorPredicates .RUNTIME_OR_ERROR ,
300
+ ex -> doResumeSynchronization (synchronizationManager , synchronizations )
301
+ .cast (SuspendedResourcesHolder .class ));
297
302
});
298
303
}
299
304
else if (transaction != null ) {
300
305
// Transaction active but no synchronization active.
301
- Mono <Optional <Object >> suspendedResources = doSuspend (synchronizationManager , transaction ).map (Optional ::of ).defaultIfEmpty (Optional .empty ());
306
+ Mono <Optional <Object >> suspendedResources =
307
+ doSuspend (synchronizationManager , transaction ).map (Optional ::of ).defaultIfEmpty (Optional .empty ());
302
308
return suspendedResources .map (it -> new SuspendedResourcesHolder (it .orElse (null )));
303
309
}
304
310
else {
@@ -445,10 +451,12 @@ private Mono<Void> processCommit(TransactionSynchronizationManager synchronizati
445
451
// Eclipse compiler with regard to inferred generics.
446
452
Mono <Object > result = propagateException ;
447
453
if (ErrorPredicates .UNEXPECTED_ROLLBACK .test (ex )) {
448
- result = triggerAfterCompletion (synchronizationManager , status , TransactionSynchronization .STATUS_ROLLED_BACK ).then (propagateException );
454
+ result = triggerAfterCompletion (synchronizationManager , status , TransactionSynchronization .STATUS_ROLLED_BACK )
455
+ .then (propagateException );
449
456
}
450
457
else if (ErrorPredicates .TRANSACTION_EXCEPTION .test (ex )) {
451
- result = triggerAfterCompletion (synchronizationManager , status , TransactionSynchronization .STATUS_UNKNOWN ).then (propagateException );
458
+ result = triggerAfterCompletion (synchronizationManager , status , TransactionSynchronization .STATUS_UNKNOWN )
459
+ .then (propagateException );
452
460
}
453
461
else if (ErrorPredicates .RUNTIME_OR_ERROR .test (ex )) {
454
462
Mono <Void > mono ;
@@ -458,7 +466,8 @@ else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
458
466
else {
459
467
mono = Mono .empty ();
460
468
}
461
- result = mono .then (doRollbackOnCommitException (synchronizationManager , status , ex )).then (propagateException );
469
+ result = mono .then (doRollbackOnCommitException (synchronizationManager , status , ex ))
470
+ .then (propagateException );
462
471
}
463
472
464
473
return result ;
@@ -573,9 +582,9 @@ private Mono<Void> triggerBeforeCommit(TransactionSynchronizationManager synchro
573
582
if (status .isDebug ()) {
574
583
logger .trace ("Triggering beforeCommit synchronization" );
575
584
}
576
- return TransactionSynchronizationUtils .triggerBeforeCommit (synchronizationManager .getSynchronizations (), status .isReadOnly ());
585
+ return TransactionSynchronizationUtils .triggerBeforeCommit (
586
+ synchronizationManager .getSynchronizations (), status .isReadOnly ());
577
587
}
578
-
579
588
return Mono .empty ();
580
589
}
581
590
@@ -593,7 +602,6 @@ private Mono<Void> triggerBeforeCompletion(TransactionSynchronizationManager syn
593
602
}
594
603
return TransactionSynchronizationUtils .triggerBeforeCompletion (synchronizationManager .getSynchronizations ());
595
604
}
596
-
597
605
return Mono .empty ();
598
606
}
599
607
@@ -611,7 +619,6 @@ private Mono<Void> triggerAfterCommit(TransactionSynchronizationManager synchron
611
619
}
612
620
return TransactionSynchronizationUtils .invokeAfterCommit (synchronizationManager .getSynchronizations ());
613
621
}
614
-
615
622
return Mono .empty ();
616
623
}
617
624
@@ -639,10 +646,10 @@ else if (!synchronizations.isEmpty()) {
639
646
// Existing transaction that we participate in, controlled outside
640
647
// of the scope of this Spring transaction manager -> try to register
641
648
// an afterCompletion callback with the existing (JTA) transaction.
642
- return registerAfterCompletionWithExistingTransaction (synchronizationManager , status .getTransaction (), synchronizations );
649
+ return registerAfterCompletionWithExistingTransaction (
650
+ synchronizationManager , status .getTransaction (), synchronizations );
643
651
}
644
652
}
645
-
646
653
return Mono .empty ();
647
654
}
648
655
@@ -690,7 +697,8 @@ private Mono<Void> cleanupAfterCompletion(TransactionSynchronizationManager sync
690
697
logger .debug ("Resuming suspended transaction after completion of inner transaction" );
691
698
}
692
699
Object transaction = (status .hasTransaction () ? status .getTransaction () : null );
693
- return cleanup .then (resume (synchronizationManager , transaction , (SuspendedResourcesHolder ) status .getSuspendedResources ()));
700
+ return cleanup .then (resume (synchronizationManager , transaction ,
701
+ (SuspendedResourcesHolder ) status .getSuspendedResources ()));
694
702
}
695
703
return cleanup ;
696
704
});
@@ -716,14 +724,16 @@ private Mono<Void> cleanupAfterCompletion(TransactionSynchronizationManager sync
716
724
* returned transaction object.
717
725
* @param synchronizationManager the synchronization manager bound to the current transaction
718
726
* @return the current transaction object
719
- * @throws org.springframework.transaction.CannotCreateTransactionException if transaction support is not available
727
+ * @throws org.springframework.transaction.CannotCreateTransactionException
728
+ * if transaction support is not available
720
729
* @throws TransactionException in case of lookup or system errors
721
730
* @see #doBegin
722
731
* @see #doCommit
723
732
* @see #doRollback
724
733
* @see GenericReactiveTransaction#getTransaction
725
734
*/
726
- protected abstract Object doGetTransaction (TransactionSynchronizationManager synchronizationManager ) throws TransactionException ;
735
+ protected abstract Object doGetTransaction (TransactionSynchronizationManager synchronizationManager )
736
+ throws TransactionException ;
727
737
728
738
/**
729
739
* Check if the given transaction object indicates an existing transaction
@@ -775,7 +785,8 @@ protected abstract Mono<Void> doBegin(TransactionSynchronizationManager synchron
775
785
* @param transaction the transaction object returned by {@code doGetTransaction}
776
786
* @return an object that holds suspended resources
777
787
* (will be kept unexamined for passing it into doResume)
778
- * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if suspending is not supported by the transaction manager implementation
788
+ * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
789
+ * if suspending is not supported by the transaction manager implementation
779
790
* @throws TransactionException in case of system errors
780
791
* @see #doResume
781
792
*/
@@ -795,7 +806,8 @@ protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizati
795
806
* @param transaction the transaction object returned by {@code doGetTransaction}
796
807
* @param suspendedResources the object that holds suspended resources,
797
808
* as returned by doSuspend
798
- * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if resuming is not supported by the transaction manager implementation
809
+ * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
810
+ * if suspending is not supported by the transaction manager implementation
799
811
* @throws TransactionException in case of system errors
800
812
* @see #doSuspend
801
813
*/
0 commit comments