2727import com .google .cloud .spanner .Options .TransactionOption ;
2828import com .google .cloud .spanner .Options .UpdateOption ;
2929import com .google .cloud .spanner .SessionClient .SessionConsumer ;
30+ import com .google .cloud .spanner .SessionPool .PooledSessionReplacementHandler ;
31+ import com .google .cloud .spanner .SessionPool .SessionPoolTransactionRunner ;
3032import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
3133import com .google .common .annotations .VisibleForTesting ;
3234import com .google .common .base .Preconditions ;
5254import java .util .concurrent .atomic .AtomicLong ;
5355import java .util .concurrent .atomic .AtomicReference ;
5456
57+ class MultiplexedSessionTransactionRunner implements TransactionRunner {
58+ private final SessionPool sessionPool ;
59+ private final TransactionRunnerImpl transactionRunner ;
60+ private final TransactionOption [] options ;
61+
62+ public MultiplexedSessionTransactionRunner (SessionImpl session , SessionPool sessionPool , TransactionOption ... options ) {
63+ this .sessionPool = sessionPool ;
64+ this .transactionRunner = new TransactionRunnerImpl (session , options ); // Uses multiplexed session initially
65+ session .setActive (this .transactionRunner );
66+ this .options = options ;
67+ }
68+
69+ @ Override
70+ public <T > T run (TransactionCallable <T > callable ) {
71+ boolean useRegularSession = false ;
72+ while (true ) {
73+ try {
74+ if (useRegularSession ) {
75+ TransactionRunner runner = new SessionPoolTransactionRunner (sessionPool .getSession (), null , options );
76+ return runner .run (callable );
77+ } else {
78+ return transactionRunner .run (callable ); // Run using multiplexed session
79+ }
80+ } catch (SpannerException e ) {
81+ if (e .getErrorCode () == ErrorCode .UNIMPLEMENTED ) {
82+ useRegularSession = true ; // Switch to regular session
83+ } else {
84+ throw e ; // Other errors propagate
85+ }
86+ }
87+ }
88+ }
89+
90+ @ Override
91+ public Timestamp getCommitTimestamp () {
92+ return null ;
93+ }
94+
95+ @ Override
96+ public CommitResponse getCommitResponse () {
97+ return null ;
98+ }
99+
100+ @ Override
101+ public TransactionRunner allowNestedTransaction () {
102+ return null ;
103+ }
104+ }
105+
106+
55107/**
56108 * {@link DatabaseClient} implementation that uses a single multiplexed session to execute
57109 * transactions.
@@ -75,18 +127,30 @@ static class MultiplexedSessionTransaction extends SessionImpl {
75127 private final int singleUseChannelHint ;
76128
77129 private boolean done ;
130+ private SessionPool pool ;
78131
79132 MultiplexedSessionTransaction (
80133 MultiplexedSessionDatabaseClient client ,
81134 ISpan span ,
82135 SessionReference sessionReference ,
83136 int singleUseChannelHint ,
84137 boolean singleUse ) {
138+ this (client , span , sessionReference , singleUseChannelHint , singleUse , null );
139+ }
140+
141+ MultiplexedSessionTransaction (
142+ MultiplexedSessionDatabaseClient client ,
143+ ISpan span ,
144+ SessionReference sessionReference ,
145+ int singleUseChannelHint ,
146+ boolean singleUse ,
147+ SessionPool pool ) {
85148 super (client .sessionClient .getSpanner (), sessionReference , singleUseChannelHint );
86149 this .client = client ;
87150 this .singleUse = singleUse ;
88151 this .singleUseChannelHint = singleUseChannelHint ;
89152 this .client .numSessionsAcquired .incrementAndGet ();
153+ this .pool = pool ;
90154 setCurrentSpan (span );
91155 }
92156
@@ -134,6 +198,12 @@ public CommitResponse writeAtLeastOnceWithOptions(
134198 return response ;
135199 }
136200
201+ @ Override
202+ public TransactionRunner readWriteTransaction (TransactionOption ... options ) {
203+ return new MultiplexedSessionTransactionRunner (this , pool , options );
204+ }
205+
206+
137207 @ Override
138208 void onTransactionDone () {
139209 boolean markedDone = false ;
@@ -225,6 +295,8 @@ public void close() {
225295 */
226296 @ VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean (false );
227297
298+ private SessionPool pool ;
299+
228300 MultiplexedSessionDatabaseClient (SessionClient sessionClient ) {
229301 this (sessionClient , Clock .systemUTC ());
230302 }
@@ -299,6 +371,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
299371 initialSessionReferenceFuture );
300372 }
301373
374+ void setPool (SessionPool pool ) {
375+ this .pool = pool ;
376+ }
377+
302378 private static void maybeWaitForSessionCreation (
303379 SessionPoolOptions sessionPoolOptions , ApiFuture <SessionReference > future ) {
304380 Duration waitDuration = sessionPoolOptions .getWaitForMinSessions ();
@@ -489,7 +565,8 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
489565 // any special handling of such errors.
490566 multiplexedSessionReference .get ().get (),
491567 singleUse ? getSingleUseChannelHint () : NO_CHANNEL_HINT ,
492- singleUse );
568+ singleUse ,
569+ this .pool );
493570 } catch (ExecutionException executionException ) {
494571 throw SpannerExceptionFactory .asSpannerException (executionException .getCause ());
495572 } catch (InterruptedException interruptedException ) {
0 commit comments