2727import static org .junit .Assert .assertEquals ;
2828import static org .junit .Assert .assertNotNull ;
2929import static org .junit .Assert .assertThrows ;
30+ import static org .junit .Assume .assumeFalse ;
3031
3132import com .google .api .core .ApiFuture ;
3233import com .google .api .core .ApiFutureCallback ;
@@ -178,9 +179,11 @@ public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exceptio
178179 AsyncTransactionManager manager = client ().transactionManagerAsync ();
179180 TransactionContext txn = manager .beginAsync ().get ();
180181 txn .executeUpdateAsync (UPDATE_STATEMENT ).get ();
181- final TransactionSelector selector =
182- ((TransactionContextImpl ) ((SessionPoolTransactionContext ) txn ).delegate )
183- .getTransactionSelector ();
182+ if (txn instanceof SessionPoolTransactionContext ) {
183+ txn = ((SessionPoolTransactionContext ) txn ).delegate ;
184+ }
185+ TransactionContextImpl impl = (TransactionContextImpl ) txn ;
186+ final TransactionSelector selector = impl .getTransactionSelector ();
184187
185188 SpannerApiFutures .get (manager .closeAsync ());
186189 // The mock server should already have the Rollback request, as we are waiting for the returned
@@ -247,6 +250,11 @@ public void asyncTransactionManagerUpdate() throws Exception {
247250
248251 @ Test
249252 public void asyncTransactionManagerIsNonBlocking () throws Exception {
253+ // TODO: Remove this condition once DelayedAsyncTransactionManager is made non-blocking with
254+ // multiplexed sessions.
255+ assumeFalse (
256+ "DelayedAsyncTransactionManager is currently blocking with multiplexed sessions." ,
257+ spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSessionForRW ());
250258 mockSpanner .freeze ();
251259 try (AsyncTransactionManager manager = clientWithEmptySessionPool ().transactionManagerAsync ()) {
252260 TransactionContextFuture transactionContextFuture = manager .beginAsync ();
@@ -346,7 +354,7 @@ public void asyncTransactionManagerFireAndForgetInvalidUpdate() throws Exception
346354 }
347355 }
348356 }
349- ImmutableList <Class <? extends Message >> expectedRequests =
357+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
350358 ImmutableList .of (
351359 BatchCreateSessionsRequest .class ,
352360 // The first update that fails. This will cause a transaction retry.
@@ -358,10 +366,24 @@ public void asyncTransactionManagerFireAndForgetInvalidUpdate() throws Exception
358366 ExecuteSqlRequest .class ,
359367 ExecuteSqlRequest .class ,
360368 CommitRequest .class );
361- if (isMultiplexedSessionsEnabled ()) {
362- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
369+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
370+ ImmutableList .of (
371+ CreateSessionRequest .class ,
372+ // The first update that fails. This will cause a transaction retry.
373+ ExecuteSqlRequest .class ,
374+ // The retry will use an explicit BeginTransaction call.
375+ BeginTransactionRequest .class ,
376+ // The first update will again fail, but now there is a transaction id, so the
377+ // transaction can continue.
378+ ExecuteSqlRequest .class ,
379+ ExecuteSqlRequest .class ,
380+ CommitRequest .class );
381+ if (isMultiplexedSessionsEnabledForRW ()) {
382+ assertThat (mockSpanner .getRequestTypes ())
383+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
363384 } else {
364- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
385+ assertThat (mockSpanner .getRequestTypes ())
386+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
365387 }
366388 }
367389
@@ -501,14 +523,25 @@ public void asyncTransactionManagerUpdateAbortedWithoutGettingResult() throws Ex
501523 // The server may receive 1 or 2 commit requests depending on whether the call to
502524 // commitAsync() already knows that the transaction has aborted. If it does, it will not
503525 // attempt to call the Commit RPC and instead directly propagate the Aborted error.
504- assertThat (mockSpanner .getRequestTypes ())
505- .containsAtLeast (
506- BatchCreateSessionsRequest .class ,
507- ExecuteSqlRequest .class ,
508- // The retry will use a BeginTransaction RPC.
509- BeginTransactionRequest .class ,
510- ExecuteSqlRequest .class ,
511- CommitRequest .class );
526+ if (isMultiplexedSessionsEnabledForRW ()) {
527+ assertThat (mockSpanner .getRequestTypes ())
528+ .containsAtLeast (
529+ CreateSessionRequest .class ,
530+ ExecuteSqlRequest .class ,
531+ // The retry will use a BeginTransaction RPC.
532+ BeginTransactionRequest .class ,
533+ ExecuteSqlRequest .class ,
534+ CommitRequest .class );
535+ } else {
536+ assertThat (mockSpanner .getRequestTypes ())
537+ .containsAtLeast (
538+ BatchCreateSessionsRequest .class ,
539+ ExecuteSqlRequest .class ,
540+ // The retry will use a BeginTransaction RPC.
541+ BeginTransactionRequest .class ,
542+ ExecuteSqlRequest .class ,
543+ CommitRequest .class );
544+ }
512545 break ;
513546 } catch (AbortedException e ) {
514547 transactionContextFuture = manager .resetForRetryAsync ();
@@ -556,13 +589,10 @@ public void asyncTransactionManagerWaitsUntilAsyncUpdateHasFinished() throws Exc
556589 executor )
557590 .commitAsync ()
558591 .get ();
559- if (isMultiplexedSessionsEnabled ()) {
592+ if (isMultiplexedSessionsEnabledForRW ()) {
560593 assertThat (mockSpanner .getRequestTypes ())
561594 .containsExactly (
562- CreateSessionRequest .class ,
563- BatchCreateSessionsRequest .class ,
564- ExecuteSqlRequest .class ,
565- CommitRequest .class );
595+ CreateSessionRequest .class , ExecuteSqlRequest .class , CommitRequest .class );
566596 } else {
567597 assertThat (mockSpanner .getRequestTypes ())
568598 .containsExactly (
@@ -600,6 +630,11 @@ public void asyncTransactionManagerBatchUpdate() throws Exception {
600630
601631 @ Test
602632 public void asyncTransactionManagerIsNonBlockingWithBatchUpdate () throws Exception {
633+ // TODO: Remove this condition once DelayedAsyncTransactionManager is made non-blocking with
634+ // multiplexed sessions.
635+ assumeFalse (
636+ "DelayedAsyncTransactionManager is currently blocking with multiplexed sessions." ,
637+ spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSessionForRW ());
603638 mockSpanner .freeze ();
604639 try (AsyncTransactionManager manager = clientWithEmptySessionPool ().transactionManagerAsync ()) {
605640 TransactionContextFuture transactionContextFuture = manager .beginAsync ();
@@ -671,16 +706,24 @@ public void asyncTransactionManagerFireAndForgetInvalidBatchUpdate() throws Exce
671706 }
672707 }
673708 }
674- ImmutableList <Class <? extends Message >> expectedRequests =
709+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
675710 ImmutableList .of (
676711 BatchCreateSessionsRequest .class ,
677712 ExecuteBatchDmlRequest .class ,
678713 ExecuteBatchDmlRequest .class ,
679714 CommitRequest .class );
680- if (isMultiplexedSessionsEnabled ()) {
681- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
715+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
716+ ImmutableList .of (
717+ CreateSessionRequest .class ,
718+ ExecuteBatchDmlRequest .class ,
719+ ExecuteBatchDmlRequest .class ,
720+ CommitRequest .class );
721+ if (isMultiplexedSessionsEnabledForRW ()) {
722+ assertThat (mockSpanner .getRequestTypes ())
723+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
682724 } else {
683- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
725+ assertThat (mockSpanner .getRequestTypes ())
726+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
684727 }
685728 }
686729
@@ -714,17 +757,26 @@ public void asyncTransactionManagerBatchUpdateAborted() throws Exception {
714757 assertThat (attempt .get ()).isEqualTo (2 );
715758 // There should only be 1 CommitRequest, as the first attempt should abort already after the
716759 // ExecuteBatchDmlRequest.
717- ImmutableList <Class <? extends Message >> expectedRequests =
760+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
718761 ImmutableList .of (
719762 BatchCreateSessionsRequest .class ,
720763 ExecuteBatchDmlRequest .class ,
721764 BeginTransactionRequest .class ,
722765 ExecuteBatchDmlRequest .class ,
723766 CommitRequest .class );
724- if (isMultiplexedSessionsEnabled ()) {
725- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
767+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
768+ ImmutableList .of (
769+ CreateSessionRequest .class ,
770+ ExecuteBatchDmlRequest .class ,
771+ BeginTransactionRequest .class ,
772+ ExecuteBatchDmlRequest .class ,
773+ CommitRequest .class );
774+ if (isMultiplexedSessionsEnabledForRW ()) {
775+ assertThat (mockSpanner .getRequestTypes ())
776+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
726777 } else {
727- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
778+ assertThat (mockSpanner .getRequestTypes ())
779+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
728780 }
729781 }
730782
@@ -756,17 +808,30 @@ public void asyncTransactionManagerBatchUpdateAbortedBeforeFirstStatement() thro
756808 assertThat (attempt .get ()).isEqualTo (2 );
757809 // There should only be 1 CommitRequest, as the first attempt should abort already after the
758810 // ExecuteBatchDmlRequest.
759- ImmutableList <Class <? extends Message >> expectedRequests =
811+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
760812 ImmutableList .of (
761813 BatchCreateSessionsRequest .class ,
762814 ExecuteBatchDmlRequest .class ,
763815 BeginTransactionRequest .class ,
764816 ExecuteBatchDmlRequest .class ,
765817 CommitRequest .class );
766- if (isMultiplexedSessionsEnabled ()) {
767- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
818+ // There should only be 1 CommitRequest, as the first attempt should abort already after the
819+ // ExecuteBatchDmlRequest.
820+ // When requests run using multiplexed session, the BatchCreateSessionsRequest will not be
821+ // triggered because we are creating an empty pool during initialization.
822+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
823+ ImmutableList .of (
824+ CreateSessionRequest .class ,
825+ ExecuteBatchDmlRequest .class ,
826+ BeginTransactionRequest .class ,
827+ ExecuteBatchDmlRequest .class ,
828+ CommitRequest .class );
829+ if (isMultiplexedSessionsEnabledForRW ()) {
830+ assertThat (mockSpanner .getRequestTypes ())
831+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
768832 } else {
769- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
833+ assertThat (mockSpanner .getRequestTypes ())
834+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
770835 }
771836 }
772837
@@ -816,18 +881,28 @@ public void asyncTransactionManagerWithBatchUpdateCommitAborted() throws Excepti
816881 } finally {
817882 mockSpanner .putStatementResult (StatementResult .update (UPDATE_STATEMENT , UPDATE_COUNT ));
818883 }
819- ImmutableList <Class <? extends Message >> expectedRequests =
884+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
820885 ImmutableList .of (
821886 BatchCreateSessionsRequest .class ,
822887 ExecuteBatchDmlRequest .class ,
823888 CommitRequest .class ,
824889 BeginTransactionRequest .class ,
825890 ExecuteBatchDmlRequest .class ,
826891 CommitRequest .class );
827- if (isMultiplexedSessionsEnabled ()) {
828- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
892+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
893+ ImmutableList .of (
894+ CreateSessionRequest .class ,
895+ ExecuteBatchDmlRequest .class ,
896+ CommitRequest .class ,
897+ BeginTransactionRequest .class ,
898+ ExecuteBatchDmlRequest .class ,
899+ CommitRequest .class );
900+ if (isMultiplexedSessionsEnabledForRW ()) {
901+ assertThat (mockSpanner .getRequestTypes ())
902+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
829903 } else {
830- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
904+ assertThat (mockSpanner .getRequestTypes ())
905+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
831906 }
832907 }
833908
@@ -865,28 +940,46 @@ public void asyncTransactionManagerBatchUpdateAbortedWithoutGettingResult() thro
865940 }
866941 assertThat (attempt .get ()).isEqualTo (2 );
867942 List <Class <? extends AbstractMessage >> requests = mockSpanner .getRequestTypes ();
868- // Remove the CreateSession requests for multiplexed sessions, as those are not relevant for
869- // this test.
870- requests .removeIf (request -> request == CreateSessionRequest .class );
871943 int size = Iterables .size (requests );
872944 assertThat (size ).isIn (Range .closed (5 , 6 ));
873945 if (size == 5 ) {
874- assertThat (requests )
875- .containsExactly (
876- BatchCreateSessionsRequest .class ,
877- ExecuteBatchDmlRequest .class ,
878- BeginTransactionRequest .class ,
879- ExecuteBatchDmlRequest .class ,
880- CommitRequest .class );
946+ if (isMultiplexedSessionsEnabledForRW ()) {
947+ assertThat (requests )
948+ .containsExactly (
949+ CreateSessionRequest .class ,
950+ ExecuteBatchDmlRequest .class ,
951+ BeginTransactionRequest .class ,
952+ ExecuteBatchDmlRequest .class ,
953+ CommitRequest .class );
954+ } else {
955+ assertThat (requests )
956+ .containsExactly (
957+ BatchCreateSessionsRequest .class ,
958+ ExecuteBatchDmlRequest .class ,
959+ BeginTransactionRequest .class ,
960+ ExecuteBatchDmlRequest .class ,
961+ CommitRequest .class );
962+ }
881963 } else {
882- assertThat (requests )
883- .containsExactly (
884- BatchCreateSessionsRequest .class ,
885- ExecuteBatchDmlRequest .class ,
886- CommitRequest .class ,
887- BeginTransactionRequest .class ,
888- ExecuteBatchDmlRequest .class ,
889- CommitRequest .class );
964+ if (isMultiplexedSessionsEnabledForRW ()) {
965+ assertThat (requests )
966+ .containsExactly (
967+ CreateSessionRequest .class ,
968+ ExecuteBatchDmlRequest .class ,
969+ CommitRequest .class ,
970+ BeginTransactionRequest .class ,
971+ ExecuteBatchDmlRequest .class ,
972+ CommitRequest .class );
973+ } else {
974+ assertThat (requests )
975+ .containsExactly (
976+ BatchCreateSessionsRequest .class ,
977+ ExecuteBatchDmlRequest .class ,
978+ CommitRequest .class ,
979+ BeginTransactionRequest .class ,
980+ ExecuteBatchDmlRequest .class ,
981+ CommitRequest .class );
982+ }
890983 }
891984 }
892985
@@ -914,13 +1007,18 @@ public void asyncTransactionManagerWithBatchUpdateCommitFails() {
9141007 assertThat (e .getErrorCode ()).isEqualTo (ErrorCode .INVALID_ARGUMENT );
9151008 assertThat (e .getMessage ()).contains ("mutation limit exceeded" );
9161009 }
917- ImmutableList <Class <? extends Message >> expectedRequests =
1010+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
9181011 ImmutableList .of (
9191012 BatchCreateSessionsRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
920- if (isMultiplexedSessionsEnabled ()) {
921- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
1013+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
1014+ ImmutableList .of (
1015+ CreateSessionRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
1016+ if (isMultiplexedSessionsEnabledForRW ()) {
1017+ assertThat (mockSpanner .getRequestTypes ())
1018+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
9221019 } else {
923- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
1020+ assertThat (mockSpanner .getRequestTypes ())
1021+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
9241022 }
9251023 }
9261024
@@ -945,13 +1043,18 @@ public void asyncTransactionManagerWaitsUntilAsyncBatchUpdateHasFinished() throw
9451043 }
9461044 }
9471045 }
948- ImmutableList <Class <? extends Message >> expectedRequests =
1046+ ImmutableList <Class <? extends Message >> expectedRequestsWithRegularSession =
9491047 ImmutableList .of (
9501048 BatchCreateSessionsRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
1049+ ImmutableList <Class <? extends Message >> expectedRequestsWithMultiplexedSession =
1050+ ImmutableList .of (
1051+ CreateSessionRequest .class , ExecuteBatchDmlRequest .class , CommitRequest .class );
9511052 if (isMultiplexedSessionsEnabled ()) {
952- assertThat (mockSpanner .getRequestTypes ()).containsAtLeastElementsIn (expectedRequests );
1053+ assertThat (mockSpanner .getRequestTypes ())
1054+ .containsExactlyElementsIn (expectedRequestsWithMultiplexedSession );
9531055 } else {
954- assertThat (mockSpanner .getRequestTypes ()).containsExactlyElementsIn (expectedRequests );
1056+ assertThat (mockSpanner .getRequestTypes ())
1057+ .containsExactlyElementsIn (expectedRequestsWithRegularSession );
9551058 }
9561059 }
9571060
@@ -1090,4 +1193,11 @@ private boolean isMultiplexedSessionsEnabled() {
10901193 }
10911194 return spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSession ();
10921195 }
1196+
1197+ private boolean isMultiplexedSessionsEnabledForRW () {
1198+ if (spanner .getOptions () == null || spanner .getOptions ().getSessionPoolOptions () == null ) {
1199+ return false ;
1200+ }
1201+ return spanner .getOptions ().getSessionPoolOptions ().getUseMultiplexedSessionForRW ();
1202+ }
10931203}
0 commit comments