1717package  com .google .cloud .spanner ;
1818
1919import  static  com .google .cloud .spanner .SessionImpl .NO_CHANNEL_HINT ;
20+ import  static  com .google .cloud .spanner .SpannerExceptionFactory .newSpannerException ;
2021
2122import  com .google .api .core .ApiFuture ;
2223import  com .google .api .core .ApiFutures ;
2728import  com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
2829import  com .google .common .annotations .VisibleForTesting ;
2930import  com .google .common .base .Preconditions ;
31+ import  com .google .common .util .concurrent .MoreExecutors ;
32+ import  com .google .spanner .v1 .BeginTransactionRequest ;
33+ import  com .google .spanner .v1 .RequestOptions ;
34+ import  com .google .spanner .v1 .Transaction ;
3035import  java .time .Clock ;
3136import  java .time .Duration ;
3237import  java .time .Instant ;
@@ -92,6 +97,10 @@ void onError(SpannerException spannerException) {
9297        // synchronizing, as it does not really matter exactly which error is set. 
9398        this .client .resourceNotFoundException .set ((ResourceNotFoundException ) spannerException );
9499      }
100+       // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if 
101+       // UNIMPLEMENTED with error message "Transaction type read_write not supported with 
102+       // multiplexed sessions" is returned. 
103+       this .client .maybeMarkUnimplementedForRW (spannerException );
95104    }
96105
97106    @ Override 
@@ -164,6 +173,12 @@ public void close() {
164173  /** The current multiplexed session that is used by this client. */ 
165174  private  final  AtomicReference <ApiFuture <SessionReference >> multiplexedSessionReference ;
166175
176+   /** 
177+    * The Transaction response returned by the BeginTransaction request with read-write when a 
178+    * multiplexed session is created during client initialization. 
179+    */ 
180+   private  final  SettableApiFuture <Transaction > readWriteBeginTransactionReferenceFuture ;
181+ 
167182  /** The expiration date/time of the current multiplexed session. */ 
168183  private  final  AtomicReference <Instant > expirationDate ;
169184
@@ -190,6 +205,12 @@ public void close() {
190205   */ 
191206  private  final  AtomicBoolean  unimplemented  = new  AtomicBoolean (false );
192207
208+   /** 
209+    * This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is 
210+    * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available. 
211+    */ 
212+   @ VisibleForTesting  final  AtomicBoolean  unimplementedForRW  = new  AtomicBoolean (false );
213+ 
193214  MultiplexedSessionDatabaseClient (SessionClient  sessionClient ) {
194215    this (sessionClient , Clock .systemUTC ());
195216  }
@@ -217,6 +238,7 @@ public void close() {
217238    this .tracer  = sessionClient .getSpanner ().getTracer ();
218239    final  SettableApiFuture <SessionReference > initialSessionReferenceFuture  =
219240        SettableApiFuture .create ();
241+     this .readWriteBeginTransactionReferenceFuture  = SettableApiFuture .create ();
220242    this .multiplexedSessionReference  = new  AtomicReference <>(initialSessionReferenceFuture );
221243    this .sessionClient .asyncCreateMultiplexedSession (
222244        new  SessionConsumer () {
@@ -226,6 +248,16 @@ public void onSessionReady(SessionImpl session) {
226248            // only start the maintainer if we actually managed to create a session in the first 
227249            // place. 
228250            maintainer .start ();
251+ 
252+             // initiate a begin transaction request to verify if read-write transactions are 
253+             // supported using multiplexed sessions. 
254+             if  (sessionClient 
255+                 .getSpanner ()
256+                 .getOptions ()
257+                 .getSessionPoolOptions ()
258+                 .getUseMultiplexedSessionForRW ()) {
259+               verifyBeginTransactionWithRWOnMultiplexedSessionAsync (session .getName ());
260+             }
229261          }
230262
231263          @ Override 
@@ -267,6 +299,70 @@ private void maybeMarkUnimplemented(Throwable t) {
267299    }
268300  }
269301
302+   private  void  maybeMarkUnimplementedForRW (SpannerException  spannerException ) {
303+     if  (spannerException .getErrorCode () == ErrorCode .UNIMPLEMENTED 
304+         && verifyErrorMessage (
305+             spannerException ,
306+             "Transaction type read_write not supported with multiplexed sessions" )) {
307+       unimplementedForRW .set (true );
308+     }
309+   }
310+ 
311+   private  boolean  verifyErrorMessage (SpannerException  spannerException , String  message ) {
312+     if  (spannerException .getCause () == null ) {
313+       return  false ;
314+     }
315+     if  (spannerException .getCause ().getMessage () == null ) {
316+       return  false ;
317+     }
318+     return  spannerException .getCause ().getMessage ().contains (message );
319+   }
320+ 
321+   private  void  verifyBeginTransactionWithRWOnMultiplexedSessionAsync (String  sessionName ) {
322+     // TODO: Remove once this is guaranteed to be available. 
323+     // annotate the explict BeginTransactionRequest with a transaction tag 
324+     // "multiplexed-rw-background-begin-txn" to avoid storing this request on mock spanner. 
325+     // this is to safeguard other mock spanner tests whose BeginTransaction request count will 
326+     // otherwise increase by 1. Modifying the unit tests do not seem valid since this code is 
327+     // temporary and will be removed once the read-write on multiplexed session looks stable at 
328+     // backend. 
329+     BeginTransactionRequest .Builder  requestBuilder  =
330+         BeginTransactionRequest .newBuilder ()
331+             .setSession (sessionName )
332+             .setOptions (
333+                 SessionImpl .createReadWriteTransactionOptions (
334+                     Options .fromTransactionOptions (), /* previousTransactionId = */  null ))
335+             .setRequestOptions (
336+                 RequestOptions .newBuilder ()
337+                     .setTransactionTag ("multiplexed-rw-background-begin-txn" )
338+                     .build ());
339+     final  BeginTransactionRequest  request  = requestBuilder .build ();
340+     final  ApiFuture <Transaction > requestFuture ;
341+     requestFuture  =
342+         sessionClient 
343+             .getSpanner ()
344+             .getRpc ()
345+             .beginTransactionAsync (request , /* options = */  null , /* routeToLeader = */  true );
346+     requestFuture .addListener (
347+         () -> {
348+           try  {
349+             Transaction  txn  = requestFuture .get ();
350+             if  (txn .getId ().isEmpty ()) {
351+               throw  newSpannerException (
352+                   ErrorCode .INTERNAL , "Missing id in transaction\n "  + sessionName );
353+             }
354+             readWriteBeginTransactionReferenceFuture .set (txn );
355+           } catch  (Exception  e ) {
356+             SpannerException  spannerException  = SpannerExceptionFactory .newSpannerException (e );
357+             // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions 
358+             // if UNIMPLEMENTED is returned. 
359+             maybeMarkUnimplementedForRW (spannerException );
360+             readWriteBeginTransactionReferenceFuture .setException (e );
361+           }
362+         },
363+         MoreExecutors .directExecutor ());
364+   }
365+ 
270366  boolean  isValid () {
271367    return  resourceNotFoundException .get () == null ;
272368  }
@@ -283,6 +379,10 @@ boolean isMultiplexedSessionsSupported() {
283379    return  !this .unimplemented .get ();
284380  }
285381
382+   boolean  isMultiplexedSessionsForRWSupported () {
383+     return  !this .unimplementedForRW .get ();
384+   }
385+ 
286386  void  close () {
287387    synchronized  (this ) {
288388      if  (!this .isClosed ) {
@@ -308,6 +408,17 @@ SessionReference getCurrentSessionReference() {
308408    }
309409  }
310410
411+   @ VisibleForTesting 
412+   Transaction  getReadWriteBeginTransactionReference () {
413+     try  {
414+       return  this .readWriteBeginTransactionReferenceFuture .get ();
415+     } catch  (ExecutionException  executionException ) {
416+       throw  SpannerExceptionFactory .asSpannerException (executionException .getCause ());
417+     } catch  (InterruptedException  interruptedException ) {
418+       throw  SpannerExceptionFactory .propagateInterrupt (interruptedException );
419+     }
420+   }
421+ 
311422  /** 
312423   * Returns true if the multiplexed session has been created. This client can be used before the 
313424   * session has been created, and will in that case use a delayed transaction that contains a 
0 commit comments