2727import com .google .cloud .spanner .Options .TransactionOption ;
2828import com .google .cloud .spanner .Options .UpdateOption ;
2929import com .google .cloud .spanner .SessionClient .SessionConsumer ;
30+ import com .google .cloud .spanner .SessionPool .SessionPoolTransactionRunner ;
3031import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
3132import com .google .common .annotations .VisibleForTesting ;
3233import com .google .common .base .Preconditions ;
5253import java .util .concurrent .atomic .AtomicLong ;
5354import java .util .concurrent .atomic .AtomicReference ;
5455
56+ /**
57+ * {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
58+ * "Transaction type read_write not supported with multiplexed sessions" by switching from a
59+ * multiplexed session to a regular session and then restarts the transaction.
60+ */
61+ class MultiplexedSessionTransactionRunner implements TransactionRunner {
62+ private final SessionPool sessionPool ;
63+ private final TransactionRunnerImpl transactionRunnerForMultiplexedSession ;
64+ private SessionPoolTransactionRunner transactionRunnerForRegularSession ;
65+ private final TransactionOption [] options ;
66+ private boolean isUsingMultiplexedSession = true ;
67+
68+ public MultiplexedSessionTransactionRunner (
69+ SessionImpl multiplexedSession , SessionPool sessionPool , TransactionOption ... options ) {
70+ this .sessionPool = sessionPool ;
71+ this .transactionRunnerForMultiplexedSession =
72+ new TransactionRunnerImpl (
73+ multiplexedSession , options ); // Uses multiplexed session initially
74+ multiplexedSession .setActive (this .transactionRunnerForMultiplexedSession );
75+ this .options = options ;
76+ }
77+
78+ private TransactionRunner getRunner () {
79+ if (this .isUsingMultiplexedSession ) {
80+ return this .transactionRunnerForMultiplexedSession ;
81+ } else {
82+ if (this .transactionRunnerForRegularSession == null ) {
83+ this .transactionRunnerForRegularSession =
84+ new SessionPoolTransactionRunner <>(
85+ sessionPool .getSession (),
86+ sessionPool .getPooledSessionReplacementHandler (),
87+ options );
88+ }
89+ return this .transactionRunnerForRegularSession ;
90+ }
91+ }
92+
93+ @ Override
94+ public <T > T run (TransactionCallable <T > callable ) {
95+ while (true ) {
96+ try {
97+ return getRunner ().run (callable );
98+ } catch (SpannerException e ) {
99+ if (e .getErrorCode () == ErrorCode .UNIMPLEMENTED
100+ && verifyUnimplementedErrorMessageForRWMux (e )) {
101+ this .isUsingMultiplexedSession = false ; // Fallback to regular session
102+ } else {
103+ throw e ; // Other errors propagate
104+ }
105+ }
106+ }
107+ }
108+
109+ @ Override
110+ public Timestamp getCommitTimestamp () {
111+ return getRunner ().getCommitTimestamp ();
112+ }
113+
114+ @ Override
115+ public CommitResponse getCommitResponse () {
116+ return getRunner ().getCommitResponse ();
117+ }
118+
119+ @ Override
120+ public TransactionRunner allowNestedTransaction () {
121+ getRunner ().allowNestedTransaction ();
122+ return this ;
123+ }
124+
125+ private boolean verifyUnimplementedErrorMessageForRWMux (SpannerException spannerException ) {
126+ if (spannerException .getCause () == null ) {
127+ return false ;
128+ }
129+ if (spannerException .getCause ().getMessage () == null ) {
130+ return false ;
131+ }
132+ return spannerException
133+ .getCause ()
134+ .getMessage ()
135+ .contains ("Transaction type read_write not supported with multiplexed sessions" );
136+ }
137+ }
138+
55139/**
56140 * {@link DatabaseClient} implementation that uses a single multiplexed session to execute
57141 * transactions.
@@ -75,18 +159,30 @@ static class MultiplexedSessionTransaction extends SessionImpl {
75159 private final int singleUseChannelHint ;
76160
77161 private boolean done ;
162+ private final SessionPool pool ;
78163
79164 MultiplexedSessionTransaction (
80165 MultiplexedSessionDatabaseClient client ,
81166 ISpan span ,
82167 SessionReference sessionReference ,
83168 int singleUseChannelHint ,
84169 boolean singleUse ) {
170+ this (client , span , sessionReference , singleUseChannelHint , singleUse , null );
171+ }
172+
173+ MultiplexedSessionTransaction (
174+ MultiplexedSessionDatabaseClient client ,
175+ ISpan span ,
176+ SessionReference sessionReference ,
177+ int singleUseChannelHint ,
178+ boolean singleUse ,
179+ SessionPool pool ) {
85180 super (client .sessionClient .getSpanner (), sessionReference , singleUseChannelHint );
86181 this .client = client ;
87182 this .singleUse = singleUse ;
88183 this .singleUseChannelHint = singleUseChannelHint ;
89184 this .client .numSessionsAcquired .incrementAndGet ();
185+ this .pool = pool ;
90186 setCurrentSpan (span );
91187 }
92188
@@ -134,6 +230,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
134230 return response ;
135231 }
136232
233+ @ Override
234+ public TransactionRunner readWriteTransaction (TransactionOption ... options ) {
235+ return new MultiplexedSessionTransactionRunner (this , pool , options );
236+ }
237+
137238 @ Override
138239 void onTransactionDone () {
139240 boolean markedDone = false ;
@@ -225,6 +326,8 @@ public void close() {
225326 */
226327 @ VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean (false );
227328
329+ private SessionPool pool ;
330+
228331 MultiplexedSessionDatabaseClient (SessionClient sessionClient ) {
229332 this (sessionClient , Clock .systemUTC ());
230333 }
@@ -299,6 +402,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
299402 initialSessionReferenceFuture );
300403 }
301404
405+ void setPool (SessionPool pool ) {
406+ this .pool = pool ;
407+ }
408+
302409 private static void maybeWaitForSessionCreation (
303410 SessionPoolOptions sessionPoolOptions , ApiFuture <SessionReference > future ) {
304411 Duration waitDuration = sessionPoolOptions .getWaitForMinSessions ();
@@ -489,7 +596,8 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
489596 // any special handling of such errors.
490597 multiplexedSessionReference .get ().get (),
491598 singleUse ? getSingleUseChannelHint () : NO_CHANNEL_HINT ,
492- singleUse );
599+ singleUse ,
600+ this .pool );
493601 } catch (ExecutionException executionException ) {
494602 throw SpannerExceptionFactory .asSpannerException (executionException .getCause ());
495603 } catch (InterruptedException interruptedException ) {
@@ -499,7 +607,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
499607
500608 private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction () {
501609 return new DelayedMultiplexedSessionTransaction (
502- this , tracer .getCurrentSpan (), multiplexedSessionReference .get ());
610+ this , tracer .getCurrentSpan (), multiplexedSessionReference .get (), this . pool );
503611 }
504612
505613 private int getSingleUseChannelHint () {
0 commit comments