2727import com .google .cloud .spanner .Options .TransactionOption ;
2828import com .google .cloud .spanner .Options .UpdateOption ;
2929import com .google .cloud .spanner .SessionClient .SessionConsumer ;
30- import com .google .cloud .spanner .SessionPool .PooledSessionReplacementHandler ;
3130import com .google .cloud .spanner .SessionPool .SessionPoolTransactionRunner ;
3231import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
3332import com .google .common .annotations .VisibleForTesting ;
5655
5756class MultiplexedSessionTransactionRunner implements TransactionRunner {
5857 private final SessionPool sessionPool ;
59- private final TransactionRunnerImpl transactionRunner ;
58+ private final TransactionRunnerImpl transactionRunnerForMultiplexedSession ;
59+ private SessionPoolTransactionRunner transactionRunnerForRegularSession ;
6060 private final TransactionOption [] options ;
61+ private boolean isUsingMultiplexedSession = true ;
6162
62- public MultiplexedSessionTransactionRunner (SessionImpl session , SessionPool sessionPool , TransactionOption ... options ) {
63+ public MultiplexedSessionTransactionRunner (
64+ SessionImpl multiplexedSession , SessionPool sessionPool , TransactionOption ... options ) {
6365 this .sessionPool = sessionPool ;
64- this .transactionRunner = new TransactionRunnerImpl (session , options ); // Uses multiplexed session initially
65- session .setActive (this .transactionRunner );
66+ this .transactionRunnerForMultiplexedSession =
67+ new TransactionRunnerImpl (
68+ multiplexedSession , options ); // Uses multiplexed session initially
69+ multiplexedSession .setActive (this .transactionRunnerForMultiplexedSession );
6670 this .options = options ;
6771 }
6872
73+ private TransactionRunner getRunner () {
74+ if (this .isUsingMultiplexedSession ) {
75+ return this .transactionRunnerForMultiplexedSession ;
76+ } else {
77+ if (this .transactionRunnerForRegularSession == null ) {
78+ this .transactionRunnerForRegularSession =
79+ new SessionPoolTransactionRunner <>(
80+ sessionPool .getSession (),
81+ sessionPool .getPooledSessionReplacementHandler (),
82+ options );
83+ }
84+ return this .transactionRunnerForRegularSession ;
85+ }
86+ }
87+
6988 @ Override
7089 public <T > T run (TransactionCallable <T > callable ) {
71- boolean useRegularSession = false ;
7290 while (true ) {
7391 try {
74- if (useRegularSession ) {
75- TransactionRunner runner = new SessionPoolTransactionRunner (sessionPool .getSession (), null , options );
76- return runner .run (callable );
77- } else {
78- return transactionRunner .run (callable ); // Run using multiplexed session
79- }
92+ return getRunner ().run (callable );
8093 } catch (SpannerException e ) {
8194 if (e .getErrorCode () == ErrorCode .UNIMPLEMENTED ) {
82- useRegularSession = true ; // Switch to regular session
95+ this . isUsingMultiplexedSession = false ;
8396 } else {
8497 throw e ; // Other errors propagate
8598 }
@@ -89,21 +102,21 @@ public <T> T run(TransactionCallable<T> callable) {
89102
90103 @ Override
91104 public Timestamp getCommitTimestamp () {
92- return this . transactionRunner .getCommitTimestamp ();
105+ return getRunner () .getCommitTimestamp ();
93106 }
94107
95108 @ Override
96109 public CommitResponse getCommitResponse () {
97- return this . transactionRunner .getCommitResponse ();
110+ return getRunner () .getCommitResponse ();
98111 }
99112
100113 @ Override
101114 public TransactionRunner allowNestedTransaction () {
102- return this .transactionRunner .allowNestedTransaction ();
115+ getRunner ().allowNestedTransaction ();
116+ return this ;
103117 }
104118}
105119
106-
107120/**
108121 * {@link DatabaseClient} implementation that uses a single multiplexed session to execute
109122 * transactions.
@@ -127,7 +140,7 @@ static class MultiplexedSessionTransaction extends SessionImpl {
127140 private final int singleUseChannelHint ;
128141
129142 private boolean done ;
130- private SessionPool pool ;
143+ private final SessionPool pool ;
131144
132145 MultiplexedSessionTransaction (
133146 MultiplexedSessionDatabaseClient client ,
@@ -203,7 +216,6 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
203216 return new MultiplexedSessionTransactionRunner (this , pool , options );
204217 }
205218
206-
207219 @ Override
208220 void onTransactionDone () {
209221 boolean markedDone = false ;
@@ -576,7 +588,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
576588
577589 private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction () {
578590 return new DelayedMultiplexedSessionTransaction (
579- this , tracer .getCurrentSpan (), multiplexedSessionReference .get ());
591+ this , tracer .getCurrentSpan (), multiplexedSessionReference .get (), this . pool );
580592 }
581593
582594 private int getSingleUseChannelHint () {
0 commit comments