1717package com .google .cloud .spanner ;
1818
1919import static com .google .cloud .spanner .SessionImpl .NO_CHANNEL_HINT ;
20+ import static com .google .cloud .spanner .SpannerExceptionFactory .newSpannerException ;
2021
2122import com .google .api .core .ApiFuture ;
2223import com .google .api .core .ApiFutures ;
2728import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
2829import com .google .common .annotations .VisibleForTesting ;
2930import com .google .common .base .Preconditions ;
31+ import com .google .common .util .concurrent .MoreExecutors ;
32+ import com .google .spanner .v1 .BeginTransactionRequest ;
33+ import com .google .spanner .v1 .RequestOptions ;
34+ import com .google .spanner .v1 .Transaction ;
3035import java .time .Clock ;
3136import java .time .Duration ;
3237import java .time .Instant ;
@@ -92,6 +97,10 @@ void onError(SpannerException spannerException) {
9297 // synchronizing, as it does not really matter exactly which error is set.
9398 this .client .resourceNotFoundException .set ((ResourceNotFoundException ) spannerException );
9499 }
100+ // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if
101+ // UNIMPLEMENTED with error message "Transaction type read_write not supported with
102+ // multiplexed sessions" is returned.
103+ this .client .maybeMarkUnimplementedForRW (spannerException );
95104 }
96105
97106 @ Override
@@ -164,6 +173,12 @@ public void close() {
164173 /** The current multiplexed session that is used by this client. */
165174 private final AtomicReference <ApiFuture <SessionReference >> multiplexedSessionReference ;
166175
176+ /**
177+ * The Transaction response returned by the BeginTransaction request with read-write when a
178+ * multiplexed session is created during client initialization.
179+ */
180+ private final SettableApiFuture <Transaction > readWriteBeginTransactionReferenceFuture ;
181+
167182 /** The expiration date/time of the current multiplexed session. */
168183 private final AtomicReference <Instant > expirationDate ;
169184
@@ -190,6 +205,12 @@ public void close() {
190205 */
191206 private final AtomicBoolean unimplemented = new AtomicBoolean (false );
192207
208+ /**
209+ * This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
210+ * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
211+ */
212+ @ VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean (false );
213+
193214 MultiplexedSessionDatabaseClient (SessionClient sessionClient ) {
194215 this (sessionClient , Clock .systemUTC ());
195216 }
@@ -217,6 +238,7 @@ public void close() {
217238 this .tracer = sessionClient .getSpanner ().getTracer ();
218239 final SettableApiFuture <SessionReference > initialSessionReferenceFuture =
219240 SettableApiFuture .create ();
241+ this .readWriteBeginTransactionReferenceFuture = SettableApiFuture .create ();
220242 this .multiplexedSessionReference = new AtomicReference <>(initialSessionReferenceFuture );
221243 this .sessionClient .asyncCreateMultiplexedSession (
222244 new SessionConsumer () {
@@ -226,6 +248,16 @@ public void onSessionReady(SessionImpl session) {
226248 // only start the maintainer if we actually managed to create a session in the first
227249 // place.
228250 maintainer .start ();
251+
252+ // initiate a begin transaction request to verify if read-write transactions are
253+ // supported using multiplexed sessions.
254+ if (sessionClient
255+ .getSpanner ()
256+ .getOptions ()
257+ .getSessionPoolOptions ()
258+ .getUseMultiplexedSessionForRW ()) {
259+ verifyBeginTransactionWithRWOnMultiplexedSessionAsync (session .getName ());
260+ }
229261 }
230262
231263 @ Override
@@ -267,6 +299,70 @@ private void maybeMarkUnimplemented(Throwable t) {
267299 }
268300 }
269301
302+ private void maybeMarkUnimplementedForRW (SpannerException spannerException ) {
303+ if (spannerException .getErrorCode () == ErrorCode .UNIMPLEMENTED
304+ && verifyErrorMessage (
305+ spannerException ,
306+ "Transaction type read_write not supported with multiplexed sessions" )) {
307+ unimplementedForRW .set (true );
308+ }
309+ }
310+
311+ private boolean verifyErrorMessage (SpannerException spannerException , String message ) {
312+ if (spannerException .getCause () == null ) {
313+ return false ;
314+ }
315+ if (spannerException .getCause ().getMessage () == null ) {
316+ return false ;
317+ }
318+ return spannerException .getCause ().getMessage ().contains (message );
319+ }
320+
321+ private void verifyBeginTransactionWithRWOnMultiplexedSessionAsync (String sessionName ) {
322+ // TODO: Remove once this is guaranteed to be available.
323+ // annotate the explict BeginTransactionRequest with a transaction tag
324+ // "multiplexed-rw-background-begin-txn" to avoid storing this request on mock spanner.
325+ // this is to safeguard other mock spanner tests whose BeginTransaction request count will
326+ // otherwise increase by 1. Modifying the unit tests do not seem valid since this code is
327+ // temporary and will be removed once the read-write on multiplexed session looks stable at
328+ // backend.
329+ BeginTransactionRequest .Builder requestBuilder =
330+ BeginTransactionRequest .newBuilder ()
331+ .setSession (sessionName )
332+ .setOptions (
333+ SessionImpl .createReadWriteTransactionOptions (
334+ Options .fromTransactionOptions (), /* previousTransactionId = */ null ))
335+ .setRequestOptions (
336+ RequestOptions .newBuilder ()
337+ .setTransactionTag ("multiplexed-rw-background-begin-txn" )
338+ .build ());
339+ final BeginTransactionRequest request = requestBuilder .build ();
340+ final ApiFuture <Transaction > requestFuture ;
341+ requestFuture =
342+ sessionClient
343+ .getSpanner ()
344+ .getRpc ()
345+ .beginTransactionAsync (request , /* options = */ null , /* routeToLeader = */ true );
346+ requestFuture .addListener (
347+ () -> {
348+ try {
349+ Transaction txn = requestFuture .get ();
350+ if (txn .getId ().isEmpty ()) {
351+ throw newSpannerException (
352+ ErrorCode .INTERNAL , "Missing id in transaction\n " + sessionName );
353+ }
354+ readWriteBeginTransactionReferenceFuture .set (txn );
355+ } catch (Exception e ) {
356+ SpannerException spannerException = SpannerExceptionFactory .newSpannerException (e );
357+ // Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions
358+ // if UNIMPLEMENTED is returned.
359+ maybeMarkUnimplementedForRW (spannerException );
360+ readWriteBeginTransactionReferenceFuture .setException (e );
361+ }
362+ },
363+ MoreExecutors .directExecutor ());
364+ }
365+
270366 boolean isValid () {
271367 return resourceNotFoundException .get () == null ;
272368 }
@@ -283,6 +379,10 @@ boolean isMultiplexedSessionsSupported() {
283379 return !this .unimplemented .get ();
284380 }
285381
382+ boolean isMultiplexedSessionsForRWSupported () {
383+ return !this .unimplementedForRW .get ();
384+ }
385+
286386 void close () {
287387 synchronized (this ) {
288388 if (!this .isClosed ) {
@@ -308,6 +408,17 @@ SessionReference getCurrentSessionReference() {
308408 }
309409 }
310410
411+ @ VisibleForTesting
412+ Transaction getReadWriteBeginTransactionReference () {
413+ try {
414+ return this .readWriteBeginTransactionReferenceFuture .get ();
415+ } catch (ExecutionException executionException ) {
416+ throw SpannerExceptionFactory .asSpannerException (executionException .getCause ());
417+ } catch (InterruptedException interruptedException ) {
418+ throw SpannerExceptionFactory .propagateInterrupt (interruptedException );
419+ }
420+ }
421+
311422 /**
312423 * Returns true if the multiplexed session has been created. This client can be used before the
313424 * session has been created, and will in that case use a delayed transaction that contains a
0 commit comments