5353import java .util .concurrent .atomic .AtomicLong ;
5454import java .util .concurrent .atomic .AtomicReference ;
5555
56+ /**
57+ * {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
58+ * "Transaction type read_write not supported with multiplexed sessions" by switching from a
59+ * multiplexed session to a regular session and then restarts the transaction.
60+ */
5661class MultiplexedSessionTransactionRunner implements TransactionRunner {
5762 private final SessionPool sessionPool ;
5863 private final TransactionRunnerImpl transactionRunnerForMultiplexedSession ;
@@ -91,8 +96,9 @@ public <T> T run(TransactionCallable<T> callable) {
9196 try {
9297 return getRunner ().run (callable );
9398 } catch (SpannerException e ) {
94- if (e .getErrorCode () == ErrorCode .UNIMPLEMENTED ) {
95- this .isUsingMultiplexedSession = false ;
99+ if (e .getErrorCode () == ErrorCode .UNIMPLEMENTED
100+ && verifyUnimplementedErrorMessageForRWMux (e )) {
101+ this .isUsingMultiplexedSession = false ; // Fallback to regular session
96102 } else {
97103 throw e ; // Other errors propagate
98104 }
@@ -115,6 +121,19 @@ public TransactionRunner allowNestedTransaction() {
115121 getRunner ().allowNestedTransaction ();
116122 return this ;
117123 }
124+
125+ private boolean verifyUnimplementedErrorMessageForRWMux (SpannerException spannerException ) {
126+ if (spannerException .getCause () == null ) {
127+ return false ;
128+ }
129+ if (spannerException .getCause ().getMessage () == null ) {
130+ return false ;
131+ }
132+ return spannerException
133+ .getCause ()
134+ .getMessage ()
135+ .contains ("Transaction type read_write not supported with multiplexed sessions" );
136+ }
118137}
119138
120139/**
0 commit comments