|
28 | 28 | import static org.junit.Assert.assertNotNull; |
29 | 29 | import static org.junit.Assert.assertThrows; |
30 | 30 | import static org.junit.Assert.assertTrue; |
| 31 | +import static org.junit.Assume.assumeFalse; |
31 | 32 |
|
32 | 33 | import com.google.api.core.ApiFuture; |
33 | 34 | import com.google.api.core.ApiFutureCallback; |
@@ -179,9 +180,11 @@ public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exceptio |
179 | 180 | AsyncTransactionManager manager = client().transactionManagerAsync(); |
180 | 181 | TransactionContext txn = manager.beginAsync().get(); |
181 | 182 | txn.executeUpdateAsync(UPDATE_STATEMENT).get(); |
182 | | - final TransactionSelector selector = |
183 | | - ((TransactionContextImpl) ((SessionPoolTransactionContext) txn).delegate) |
184 | | - .getTransactionSelector(); |
| 183 | + if (txn instanceof SessionPoolTransactionContext) { |
| 184 | + txn = ((SessionPoolTransactionContext) txn).delegate; |
| 185 | + } |
| 186 | + TransactionContextImpl impl = (TransactionContextImpl) txn; |
| 187 | + final TransactionSelector selector = impl.getTransactionSelector(); |
185 | 188 |
|
186 | 189 | SpannerApiFutures.get(manager.closeAsync()); |
187 | 190 | // The mock server should already have the Rollback request, as we are waiting for the returned |
@@ -248,6 +251,11 @@ public void asyncTransactionManagerUpdate() throws Exception { |
248 | 251 |
|
249 | 252 | @Test |
250 | 253 | public void asyncTransactionManagerIsNonBlocking() throws Exception { |
| 254 | + // TODO: Remove this condition once DelayedAsyncTransactionManager is made non-blocking with |
| 255 | + // multiplexed sessions. |
| 256 | + assumeFalse( |
| 257 | + "DelayedAsyncTransactionManager is currently blocking with multiplexed sessions.", |
| 258 | + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); |
251 | 259 | mockSpanner.freeze(); |
252 | 260 | try (AsyncTransactionManager manager = clientWithEmptySessionPool().transactionManagerAsync()) { |
253 | 261 | TransactionContextFuture transactionContextFuture = manager.beginAsync(); |
@@ -359,7 +367,22 @@ public void asyncTransactionManagerFireAndForgetInvalidUpdate() throws Exception |
359 | 367 | ExecuteSqlRequest.class, |
360 | 368 | ExecuteSqlRequest.class, |
361 | 369 | CommitRequest.class); |
362 | | - if (isMultiplexedSessionsEnabled()) { |
| 370 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionForRW = |
| 371 | + ImmutableList.of( |
| 372 | + CreateSessionRequest.class, |
| 373 | + // The first update that fails. This will cause a transaction retry. |
| 374 | + ExecuteSqlRequest.class, |
| 375 | + // The retry will use an explicit BeginTransaction call. |
| 376 | + BeginTransactionRequest.class, |
| 377 | + // The first update will again fail, but now there is a transaction id, so the |
| 378 | + // transaction can continue. |
| 379 | + ExecuteSqlRequest.class, |
| 380 | + ExecuteSqlRequest.class, |
| 381 | + CommitRequest.class); |
| 382 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 383 | + assertThat(mockSpanner.getRequestTypes()) |
| 384 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionForRW); |
| 385 | + } else if (isMultiplexedSessionsEnabled()) { |
363 | 386 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
364 | 387 | } else { |
365 | 388 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -502,14 +525,25 @@ public void asyncTransactionManagerUpdateAbortedWithoutGettingResult() throws Ex |
502 | 525 | // The server may receive 1 or 2 commit requests depending on whether the call to |
503 | 526 | // commitAsync() already knows that the transaction has aborted. If it does, it will not |
504 | 527 | // attempt to call the Commit RPC and instead directly propagate the Aborted error. |
505 | | - assertThat(mockSpanner.getRequestTypes()) |
506 | | - .containsAtLeast( |
507 | | - BatchCreateSessionsRequest.class, |
508 | | - ExecuteSqlRequest.class, |
509 | | - // The retry will use a BeginTransaction RPC. |
510 | | - BeginTransactionRequest.class, |
511 | | - ExecuteSqlRequest.class, |
512 | | - CommitRequest.class); |
| 528 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 529 | + assertThat(mockSpanner.getRequestTypes()) |
| 530 | + .containsAtLeast( |
| 531 | + CreateSessionRequest.class, |
| 532 | + ExecuteSqlRequest.class, |
| 533 | + // The retry will use a BeginTransaction RPC. |
| 534 | + BeginTransactionRequest.class, |
| 535 | + ExecuteSqlRequest.class, |
| 536 | + CommitRequest.class); |
| 537 | + } else { |
| 538 | + assertThat(mockSpanner.getRequestTypes()) |
| 539 | + .containsAtLeast( |
| 540 | + BatchCreateSessionsRequest.class, |
| 541 | + ExecuteSqlRequest.class, |
| 542 | + // The retry will use a BeginTransaction RPC. |
| 543 | + BeginTransactionRequest.class, |
| 544 | + ExecuteSqlRequest.class, |
| 545 | + CommitRequest.class); |
| 546 | + } |
513 | 547 | break; |
514 | 548 | } catch (AbortedException e) { |
515 | 549 | transactionContextFuture = manager.resetForRetryAsync(); |
@@ -557,7 +591,11 @@ public void asyncTransactionManagerWaitsUntilAsyncUpdateHasFinished() throws Exc |
557 | 591 | executor) |
558 | 592 | .commitAsync() |
559 | 593 | .get(); |
560 | | - if (isMultiplexedSessionsEnabled()) { |
| 594 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 595 | + assertThat(mockSpanner.getRequestTypes()) |
| 596 | + .containsExactly( |
| 597 | + CreateSessionRequest.class, ExecuteSqlRequest.class, CommitRequest.class); |
| 598 | + } else if (isMultiplexedSessionsEnabled()) { |
561 | 599 | assertThat(mockSpanner.getRequestTypes()) |
562 | 600 | .containsExactly( |
563 | 601 | CreateSessionRequest.class, |
@@ -601,6 +639,11 @@ public void asyncTransactionManagerBatchUpdate() throws Exception { |
601 | 639 |
|
602 | 640 | @Test |
603 | 641 | public void asyncTransactionManagerIsNonBlockingWithBatchUpdate() throws Exception { |
| 642 | + // TODO: Remove this condition once DelayedAsyncTransactionManager is made non-blocking with |
| 643 | + // multiplexed sessions. |
| 644 | + assumeFalse( |
| 645 | + "DelayedAsyncTransactionManager is currently blocking with multiplexed sessions.", |
| 646 | + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()); |
604 | 647 | mockSpanner.freeze(); |
605 | 648 | try (AsyncTransactionManager manager = clientWithEmptySessionPool().transactionManagerAsync()) { |
606 | 649 | TransactionContextFuture transactionContextFuture = manager.beginAsync(); |
@@ -678,7 +721,16 @@ public void asyncTransactionManagerFireAndForgetInvalidBatchUpdate() throws Exce |
678 | 721 | ExecuteBatchDmlRequest.class, |
679 | 722 | ExecuteBatchDmlRequest.class, |
680 | 723 | CommitRequest.class); |
681 | | - if (isMultiplexedSessionsEnabled()) { |
| 724 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionsRW = |
| 725 | + ImmutableList.of( |
| 726 | + CreateSessionRequest.class, |
| 727 | + ExecuteBatchDmlRequest.class, |
| 728 | + ExecuteBatchDmlRequest.class, |
| 729 | + CommitRequest.class); |
| 730 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 731 | + assertThat(mockSpanner.getRequestTypes()) |
| 732 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionsRW); |
| 733 | + } else if (isMultiplexedSessionsEnabled()) { |
682 | 734 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
683 | 735 | } else { |
684 | 736 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -722,7 +774,17 @@ public void asyncTransactionManagerBatchUpdateAborted() throws Exception { |
722 | 774 | BeginTransactionRequest.class, |
723 | 775 | ExecuteBatchDmlRequest.class, |
724 | 776 | CommitRequest.class); |
725 | | - if (isMultiplexedSessionsEnabled()) { |
| 777 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionsRW = |
| 778 | + ImmutableList.of( |
| 779 | + CreateSessionRequest.class, |
| 780 | + ExecuteBatchDmlRequest.class, |
| 781 | + BeginTransactionRequest.class, |
| 782 | + ExecuteBatchDmlRequest.class, |
| 783 | + CommitRequest.class); |
| 784 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 785 | + assertThat(mockSpanner.getRequestTypes()) |
| 786 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionsRW); |
| 787 | + } else if (isMultiplexedSessionsEnabled()) { |
726 | 788 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
727 | 789 | } else { |
728 | 790 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -764,7 +826,20 @@ public void asyncTransactionManagerBatchUpdateAbortedBeforeFirstStatement() thro |
764 | 826 | BeginTransactionRequest.class, |
765 | 827 | ExecuteBatchDmlRequest.class, |
766 | 828 | CommitRequest.class); |
767 | | - if (isMultiplexedSessionsEnabled()) { |
| 829 | + // When requests run using multiplexed session with read-write enabled, the |
| 830 | + // BatchCreateSessionsRequest will not be |
| 831 | + // triggered because we are creating an empty pool during initialization. |
| 832 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionsRW = |
| 833 | + ImmutableList.of( |
| 834 | + CreateSessionRequest.class, |
| 835 | + ExecuteBatchDmlRequest.class, |
| 836 | + BeginTransactionRequest.class, |
| 837 | + ExecuteBatchDmlRequest.class, |
| 838 | + CommitRequest.class); |
| 839 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 840 | + assertThat(mockSpanner.getRequestTypes()) |
| 841 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionsRW); |
| 842 | + } else if (isMultiplexedSessionsEnabled()) { |
768 | 843 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
769 | 844 | } else { |
770 | 845 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -825,7 +900,18 @@ public void asyncTransactionManagerWithBatchUpdateCommitAborted() throws Excepti |
825 | 900 | BeginTransactionRequest.class, |
826 | 901 | ExecuteBatchDmlRequest.class, |
827 | 902 | CommitRequest.class); |
828 | | - if (isMultiplexedSessionsEnabled()) { |
| 903 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionsRW = |
| 904 | + ImmutableList.of( |
| 905 | + CreateSessionRequest.class, |
| 906 | + ExecuteBatchDmlRequest.class, |
| 907 | + CommitRequest.class, |
| 908 | + BeginTransactionRequest.class, |
| 909 | + ExecuteBatchDmlRequest.class, |
| 910 | + CommitRequest.class); |
| 911 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 912 | + assertThat(mockSpanner.getRequestTypes()) |
| 913 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionsRW); |
| 914 | + } else if (isMultiplexedSessionsEnabled()) { |
829 | 915 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
830 | 916 | } else { |
831 | 917 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -866,28 +952,46 @@ public void asyncTransactionManagerBatchUpdateAbortedWithoutGettingResult() thro |
866 | 952 | } |
867 | 953 | assertThat(attempt.get()).isEqualTo(2); |
868 | 954 | List<Class<? extends AbstractMessage>> requests = mockSpanner.getRequestTypes(); |
869 | | - // Remove the CreateSession requests for multiplexed sessions, as those are not relevant for |
870 | | - // this test. |
871 | | - requests.removeIf(request -> request == CreateSessionRequest.class); |
872 | 955 | int size = Iterables.size(requests); |
873 | 956 | assertThat(size).isIn(Range.closed(5, 6)); |
874 | 957 | if (size == 5) { |
875 | | - assertThat(requests) |
876 | | - .containsExactly( |
877 | | - BatchCreateSessionsRequest.class, |
878 | | - ExecuteBatchDmlRequest.class, |
879 | | - BeginTransactionRequest.class, |
880 | | - ExecuteBatchDmlRequest.class, |
881 | | - CommitRequest.class); |
| 958 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 959 | + assertThat(requests) |
| 960 | + .containsExactly( |
| 961 | + CreateSessionRequest.class, |
| 962 | + ExecuteBatchDmlRequest.class, |
| 963 | + BeginTransactionRequest.class, |
| 964 | + ExecuteBatchDmlRequest.class, |
| 965 | + CommitRequest.class); |
| 966 | + } else { |
| 967 | + assertThat(requests) |
| 968 | + .containsExactly( |
| 969 | + BatchCreateSessionsRequest.class, |
| 970 | + ExecuteBatchDmlRequest.class, |
| 971 | + BeginTransactionRequest.class, |
| 972 | + ExecuteBatchDmlRequest.class, |
| 973 | + CommitRequest.class); |
| 974 | + } |
882 | 975 | } else { |
883 | | - assertThat(requests) |
884 | | - .containsExactly( |
885 | | - BatchCreateSessionsRequest.class, |
886 | | - ExecuteBatchDmlRequest.class, |
887 | | - CommitRequest.class, |
888 | | - BeginTransactionRequest.class, |
889 | | - ExecuteBatchDmlRequest.class, |
890 | | - CommitRequest.class); |
| 976 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 977 | + assertThat(requests) |
| 978 | + .containsExactly( |
| 979 | + CreateSessionRequest.class, |
| 980 | + ExecuteBatchDmlRequest.class, |
| 981 | + CommitRequest.class, |
| 982 | + BeginTransactionRequest.class, |
| 983 | + ExecuteBatchDmlRequest.class, |
| 984 | + CommitRequest.class); |
| 985 | + } else { |
| 986 | + assertThat(requests) |
| 987 | + .containsExactly( |
| 988 | + BatchCreateSessionsRequest.class, |
| 989 | + ExecuteBatchDmlRequest.class, |
| 990 | + CommitRequest.class, |
| 991 | + BeginTransactionRequest.class, |
| 992 | + ExecuteBatchDmlRequest.class, |
| 993 | + CommitRequest.class); |
| 994 | + } |
891 | 995 | } |
892 | 996 | } |
893 | 997 |
|
@@ -918,7 +1022,13 @@ public void asyncTransactionManagerWithBatchUpdateCommitFails() { |
918 | 1022 | ImmutableList<Class<? extends Message>> expectedRequests = |
919 | 1023 | ImmutableList.of( |
920 | 1024 | BatchCreateSessionsRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); |
921 | | - if (isMultiplexedSessionsEnabled()) { |
| 1025 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionsRW = |
| 1026 | + ImmutableList.of( |
| 1027 | + CreateSessionRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); |
| 1028 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 1029 | + assertThat(mockSpanner.getRequestTypes()) |
| 1030 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionsRW); |
| 1031 | + } else if (isMultiplexedSessionsEnabled()) { |
922 | 1032 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
923 | 1033 | } else { |
924 | 1034 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -949,7 +1059,13 @@ public void asyncTransactionManagerWaitsUntilAsyncBatchUpdateHasFinished() throw |
949 | 1059 | ImmutableList<Class<? extends Message>> expectedRequests = |
950 | 1060 | ImmutableList.of( |
951 | 1061 | BatchCreateSessionsRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); |
952 | | - if (isMultiplexedSessionsEnabled()) { |
| 1062 | + ImmutableList<Class<? extends Message>> expectedRequestsWithMultiplexedSessionsRW = |
| 1063 | + ImmutableList.of( |
| 1064 | + CreateSessionRequest.class, ExecuteBatchDmlRequest.class, CommitRequest.class); |
| 1065 | + if (isMultiplexedSessionsEnabledForRW()) { |
| 1066 | + assertThat(mockSpanner.getRequestTypes()) |
| 1067 | + .containsExactlyElementsIn(expectedRequestsWithMultiplexedSessionsRW); |
| 1068 | + } else if (isMultiplexedSessionsEnabled()) { |
953 | 1069 | assertThat(mockSpanner.getRequestTypes()).containsAtLeastElementsIn(expectedRequests); |
954 | 1070 | } else { |
955 | 1071 | assertThat(mockSpanner.getRequestTypes()).containsExactlyElementsIn(expectedRequests); |
@@ -1122,4 +1238,11 @@ private boolean isMultiplexedSessionsEnabled() { |
1122 | 1238 | } |
1123 | 1239 | return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); |
1124 | 1240 | } |
| 1241 | + |
| 1242 | + private boolean isMultiplexedSessionsEnabledForRW() { |
| 1243 | + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { |
| 1244 | + return false; |
| 1245 | + } |
| 1246 | + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); |
| 1247 | + } |
1125 | 1248 | } |
0 commit comments