@@ -179,9 +179,11 @@ public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exceptio
179179 AsyncTransactionManager manager = client ().transactionManagerAsync ();
180180 TransactionContext txn = manager .beginAsync ().get ();
181181 txn .executeUpdateAsync (UPDATE_STATEMENT ).get ();
182- final TransactionSelector selector =
183- ((TransactionContextImpl ) ((SessionPoolTransactionContext ) txn ).delegate )
184- .getTransactionSelector ();
182+ if (txn instanceof SessionPoolTransactionContext ) {
183+ txn = ((SessionPoolTransactionContext ) txn ).delegate ;
184+ }
185+ TransactionContextImpl impl = (TransactionContextImpl ) txn ;
186+ final TransactionSelector selector = impl .getTransactionSelector ();
185187
186188 SpannerApiFutures .get (manager .closeAsync ());
187189 // The mock server should already have the Rollback request, as we are waiting for the returned
@@ -359,7 +361,22 @@ public void asyncTransactionManagerFireAndForgetInvalidUpdate() throws Exception
359361 ExecuteSqlRequest .class ,
360362 ExecuteSqlRequest .class ,
361363 CommitRequest .class );
362- if (isMultiplexedSessionsEnabled ()) {
364+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionForRW =
365+ ImmutableList .of (
366+ CreateSessionRequest .class ,
367+ // The first update that fails. This will cause a transaction retry.
368+ ExecuteSqlRequest .class ,
369+ // The retry will use an explicit BeginTransaction call.
370+ BeginTransactionRequest .class ,
371+ // The first update will again fail, but now there is a transaction id, so the
372+ // transaction can continue.
373+ ExecuteSqlRequest .class ,
374+ ExecuteSqlRequest .class ,
375+ CommitRequest .class );
376+ if (isMultiplexedSessionsEnabledForRW ()) {
377+ assertThat (mockSpanner .getRequestTypes ())
378+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionForRW );
379+ } else if (isMultiplexedSessionsEnabled ()) {
363380 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
364381 } else {
365382 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -502,14 +519,25 @@ public void asyncTransactionManagerUpdateAbortedWithoutGettingResult() throws Ex
502519 // The server may receive 1 or 2 commit requests depending on whether the call to
503520 // commitAsync() already knows that the transaction has aborted. If it does, it will not
504521 // attempt to call the Commit RPC and instead directly propagate the Aborted error.
505- assertThat (mockSpanner .getRequestTypes ())
506- .containsAtLeast (
507- BatchCreateSessionsRequest .class ,
508- ExecuteSqlRequest .class ,
509- // The retry will use a BeginTransaction RPC.
510- BeginTransactionRequest .class ,
511- ExecuteSqlRequest .class ,
512- CommitRequest .class );
522+ if (isMultiplexedSessionsEnabledForRW ()) {
523+ assertThat (mockSpanner .getRequestTypes ())
524+ .containsAtLeast (
525+ CreateSessionRequest .class ,
526+ ExecuteSqlRequest .class ,
527+ // The retry will use a BeginTransaction RPC.
528+ BeginTransactionRequest .class ,
529+ ExecuteSqlRequest .class ,
530+ CommitRequest .class );
531+ } else {
532+ assertThat (mockSpanner .getRequestTypes ())
533+ .containsAtLeast (
534+ BatchCreateSessionsRequest .class ,
535+ ExecuteSqlRequest .class ,
536+ // The retry will use a BeginTransaction RPC.
537+ BeginTransactionRequest .class ,
538+ ExecuteSqlRequest .class ,
539+ CommitRequest .class );
540+ }
513541 break ;
514542 } catch (AbortedException e ) {
515543 transactionContextFuture = manager .resetForRetryAsync ();
@@ -557,7 +585,11 @@ public void asyncTransactionManagerWaitsUntilAsyncUpdateHasFinished() throws Exc
557585 executor )
558586 .commitAsync ()
559587 .get ();
560- if (isMultiplexedSessionsEnabled ()) {
588+ if (isMultiplexedSessionsEnabledForRW ()) {
589+ assertThat (mockSpanner .getRequestTypes ())
590+ .containsExactly (
591+ CreateSessionRequest .class , ExecuteSqlRequest .class , CommitRequest .class );
592+ } else if (isMultiplexedSessionsEnabled ()) {
561593 assertThat (mockSpanner .getRequestTypes ())
562594 .containsExactly (
563595 CreateSessionRequest .class ,
@@ -678,7 +710,16 @@ public void asyncTransactionManagerFireAndForgetInvalidBatchUpdate() throws Exce
678710 ExecuteBatchDmlRequest .class ,
679711 ExecuteBatchDmlRequest .class ,
680712 CommitRequest .class );
681- if (isMultiplexedSessionsEnabled ()) {
713+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
714+ ImmutableList .of (
715+ CreateSessionRequest .class ,
716+ ExecuteBatchDmlRequest .class ,
717+ ExecuteBatchDmlRequest .class ,
718+ CommitRequest .class );
719+ if (isMultiplexedSessionsEnabledForRW ()) {
720+ assertThat (mockSpanner .getRequestTypes ())
721+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
722+ } else if (isMultiplexedSessionsEnabled ()) {
682723 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
683724 } else {
684725 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -722,7 +763,17 @@ public void asyncTransactionManagerBatchUpdateAborted() throws Exception {
722763 BeginTransactionRequest .class ,
723764 ExecuteBatchDmlRequest .class ,
724765 CommitRequest .class );
725- if (isMultiplexedSessionsEnabled ()) {
766+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
767+ ImmutableList .of (
768+ CreateSessionRequest .class ,
769+ ExecuteBatchDmlRequest .class ,
770+ BeginTransactionRequest .class ,
771+ ExecuteBatchDmlRequest .class ,
772+ CommitRequest .class );
773+ if (isMultiplexedSessionsEnabledForRW ()) {
774+ assertThat (mockSpanner .getRequestTypes ())
775+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
776+ } else if (isMultiplexedSessionsEnabled ()) {
726777 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
727778 } else {
728779 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -764,7 +815,20 @@ public void asyncTransactionManagerBatchUpdateAbortedBeforeFirstStatement() thro
764815 BeginTransactionRequest .class ,
765816 ExecuteBatchDmlRequest .class ,
766817 CommitRequest .class );
767- if (isMultiplexedSessionsEnabled ()) {
818+ // When requests run using multiplexed session with read-write enabled, the
819+ // BatchCreateSessionsRequest will not be
820+ // triggered because we are creating an empty pool during initialization.
821+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
822+ ImmutableList .of (
823+ CreateSessionRequest .class ,
824+ ExecuteBatchDmlRequest .class ,
825+ BeginTransactionRequest .class ,
826+ ExecuteBatchDmlRequest .class ,
827+ CommitRequest .class );
828+ if (isMultiplexedSessionsEnabledForRW ()) {
829+ assertThat (mockSpanner .getRequestTypes ())
830+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
831+ } else if (isMultiplexedSessionsEnabled ()) {
768832 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
769833 } else {
770834 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -825,7 +889,18 @@ public void asyncTransactionManagerWithBatchUpdateCommitAborted() throws Excepti
825889 BeginTransactionRequest .class ,
826890 ExecuteBatchDmlRequest .class ,
827891 CommitRequest .class );
828- if (isMultiplexedSessionsEnabled ()) {
892+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
893+ ImmutableList .of (
894+ CreateSessionRequest .class ,
895+ ExecuteBatchDmlRequest .class ,
896+ CommitRequest .class ,
897+ BeginTransactionRequest .class ,
898+ ExecuteBatchDmlRequest .class ,
899+ CommitRequest .class );
900+ if (isMultiplexedSessionsEnabledForRW ()) {
901+ assertThat (mockSpanner .getRequestTypes ())
902+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
903+ } else if (isMultiplexedSessionsEnabled ()) {
829904 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
830905 } else {
831906 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -867,27 +942,50 @@ public void asyncTransactionManagerBatchUpdateAbortedWithoutGettingResult() thro
867942 assertThat (attempt .get ()).isEqualTo (2 );
868943 List <Class <? extends AbstractMessage >> requests = mockSpanner .getRequestTypes ();
869944 // Remove the CreateSession requests for multiplexed sessions, as those are not relevant for
870- // this test.
871- requests .removeIf (request -> request == CreateSessionRequest .class );
945+ // this test if multiplexed session for read-write is not enabled.
946+ if (!isMultiplexedSessionsEnabledForRW ()) {
947+ requests .removeIf (request -> request == CreateSessionRequest .class );
948+ }
872949 int size = Iterables .size (requests );
873950 assertThat (size ).isIn (Range .closed (5 , 6 ));
874951 if (size == 5 ) {
875- assertThat (requests )
876- .containsExactly (
877- BatchCreateSessionsRequest .class ,
878- ExecuteBatchDmlRequest .class ,
879- BeginTransactionRequest .class ,
880- ExecuteBatchDmlRequest .class ,
881- CommitRequest .class );
952+ if (isMultiplexedSessionsEnabledForRW ()) {
953+ assertThat (requests )
954+ .containsExactly (
955+ CreateSessionRequest .class ,
956+ ExecuteBatchDmlRequest .class ,
957+ BeginTransactionRequest .class ,
958+ ExecuteBatchDmlRequest .class ,
959+ CommitRequest .class );
960+ } else {
961+ assertThat (requests )
962+ .containsExactly (
963+ BatchCreateSessionsRequest .class ,
964+ ExecuteBatchDmlRequest .class ,
965+ BeginTransactionRequest .class ,
966+ ExecuteBatchDmlRequest .class ,
967+ CommitRequest .class );
968+ }
882969 } else {
883- assertThat (requests )
884- .containsExactly (
885- BatchCreateSessionsRequest .class ,
886- ExecuteBatchDmlRequest .class ,
887- CommitRequest .class ,
888- BeginTransactionRequest .class ,
889- ExecuteBatchDmlRequest .class ,
890- CommitRequest .class );
970+ if (isMultiplexedSessionsEnabledForRW ()) {
971+ assertThat (requests )
972+ .containsExactly (
973+ CreateSessionRequest .class ,
974+ ExecuteBatchDmlRequest .class ,
975+ CommitRequest .class ,
976+ BeginTransactionRequest .class ,
977+ ExecuteBatchDmlRequest .class ,
978+ CommitRequest .class );
979+ } else {
980+ assertThat (requests )
981+ .containsExactly (
982+ BatchCreateSessionsRequest .class ,
983+ ExecuteBatchDmlRequest .class ,
984+ CommitRequest .class ,
985+ BeginTransactionRequest .class ,
986+ ExecuteBatchDmlRequest .class ,
987+ CommitRequest .class );
988+ }
891989 }
892990 }
893991
@@ -918,7 +1016,13 @@ public void asyncTransactionManagerWithBatchUpdateCommitFails() {
9181016 ImmutableList <Class <? extends Message >> expectedRequests =
9191017 ImmutableList .of (
9201018 BatchCreateSessionsRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
921- if (isMultiplexedSessionsEnabled ()) {
1019+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
1020+ ImmutableList .of (
1021+ CreateSessionRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
1022+ if (isMultiplexedSessionsEnabledForRW ()) {
1023+ assertThat (mockSpanner .getRequestTypes ())
1024+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
1025+ } else if (isMultiplexedSessionsEnabled ()) {
9221026 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
9231027 } else {
9241028 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -949,7 +1053,13 @@ public void asyncTransactionManagerWaitsUntilAsyncBatchUpdateHasFinished() throw
9491053 ImmutableList <Class <? extends Message >> expectedRequests =
9501054 ImmutableList .of (
9511055 BatchCreateSessionsRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
952- if (isMultiplexedSessionsEnabled ()) {
1056+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSessionsRW =
1057+ ImmutableList .of (
1058+ CreateSessionRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
1059+ if (isMultiplexedSessionsEnabledForRW ()) {
1060+ assertThat (mockSpanner .getRequestTypes ())
1061+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSessionsRW );
1062+ } else if (isMultiplexedSessionsEnabled ()) {
9531063 assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
9541064 } else {
9551065 assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
@@ -1122,4 +1232,11 @@ private boolean isMultiplexedSessionsEnabled() {
11221232 }
11231233 return spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSession ();
11241234 }
1235+
1236+ private boolean isMultiplexedSessionsEnabledForRW () {
1237+ if (spanner .getOptions () == null || spanner .getOptions ().getSessionPoolOptions () == null ) {
1238+ return false ;
1239+ }
1240+ return spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSessionForRW ();
1241+ }
11251242}
0 commit comments