Skip to content

Commit 6664d75

Browse files
authored
fix: keep track of any BeginTransaction option for a Read (#1486)
* fix: keep track of any BeginTransaction option for a Read If a StreamingReadRequest that included a BeginTransaction option was retried as a result of a transient error (UNAVAILABLE), the fact that the BeginTransaction option was included would not be registered for the retried request. This could cause a transaction to fail if the retried request returned an Aborted error, and that Aborted error was caught by the application. * test: increase max wait time
1 parent 0e832a3 commit 6664d75

File tree

3 files changed

+378
-3
lines changed

3 files changed

+378
-3
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
790790
SpannerRpc.StreamingCall call =
791791
rpc.read(builder.build(), stream.consumer(), session.getOptions());
792792
call.request(prefetchChunks);
793-
stream.setCall(call, selector != null && selector.hasBegin());
793+
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
794794
return stream;
795795
}
796796
};

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 376 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ public void testInlinedBeginFirstReadReturnsUnavailable() {
605605
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
606606
mockSpanner.setStreamingReadExecutionTime(
607607
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
608-
long value =
608+
Long value =
609609
client
610610
.readWriteTransaction()
611611
.run(
@@ -625,6 +625,381 @@ public void testInlinedBeginFirstReadReturnsUnavailable() {
625625
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
626626
}
627627

628+
@Test
629+
public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted() {
630+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
631+
mockSpanner.setStreamingReadExecutionTime(
632+
SimulatedExecutionTime.ofExceptions(
633+
Arrays.asList(
634+
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
635+
Long value =
636+
client
637+
.readWriteTransaction()
638+
.run(
639+
transaction -> {
640+
// The first attempt will return UNAVAILABLE and retry internally.
641+
// The second attempt will return ABORTED and should cause the transaction to
642+
// retry.
643+
try (ResultSet rs =
644+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
645+
if (rs.next()) {
646+
return rs.getLong(0);
647+
}
648+
}
649+
return 0L;
650+
});
651+
assertThat(value).isEqualTo(1L);
652+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
653+
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
654+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
655+
}
656+
657+
@Test
658+
public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted() {
659+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
660+
mockSpanner.setExecuteStreamingSqlExecutionTime(
661+
SimulatedExecutionTime.ofExceptions(
662+
Arrays.asList(
663+
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
664+
Long value =
665+
client
666+
.readWriteTransaction()
667+
.run(
668+
transaction -> {
669+
// The first attempt will return UNAVAILABLE and retry internally.
670+
// The second attempt will return ABORTED and should cause the transaction to
671+
// retry.
672+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
673+
if (rs.next()) {
674+
return rs.getLong(0);
675+
}
676+
}
677+
return 0L;
678+
});
679+
assertThat(value).isEqualTo(1L);
680+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
681+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
682+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
683+
}
684+
685+
@Test
686+
public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted() {
687+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
688+
mockSpanner.setExecuteSqlExecutionTime(
689+
SimulatedExecutionTime.ofExceptions(
690+
Arrays.asList(
691+
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
692+
Long value =
693+
client
694+
.readWriteTransaction()
695+
.run(
696+
transaction -> {
697+
// The first attempt will return UNAVAILABLE and retry internally.
698+
// The second attempt will return ABORTED and should cause the transaction to
699+
// retry.
700+
return transaction.executeUpdate(UPDATE_STATEMENT);
701+
});
702+
assertThat(value).isEqualTo(UPDATE_COUNT);
703+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
704+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
705+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
706+
}
707+
708+
@Test
709+
public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
710+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
711+
mockSpanner.setStreamingReadExecutionTime(
712+
SimulatedExecutionTime.ofExceptions(
713+
Arrays.asList(
714+
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
715+
Long value =
716+
client
717+
.readWriteTransaction()
718+
.run(
719+
transaction -> {
720+
// The first attempt will return UNAVAILABLE and retry internally.
721+
// The second attempt will return ABORTED and should cause the transaction to
722+
// retry.
723+
try (ResultSet rs =
724+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
725+
if (rs.next()) {
726+
return rs.getLong(0);
727+
}
728+
} catch (AbortedException e) {
729+
// Ignore the AbortedException and let the commit handle it.
730+
}
731+
return 0L;
732+
});
733+
assertThat(value).isEqualTo(1L);
734+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
735+
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
736+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
737+
}
738+
739+
@Test
740+
public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
741+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
742+
mockSpanner.setExecuteSqlExecutionTime(
743+
SimulatedExecutionTime.ofExceptions(
744+
Arrays.asList(
745+
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
746+
Long value =
747+
client
748+
.readWriteTransaction()
749+
.run(
750+
transaction -> {
751+
// The first attempt will return UNAVAILABLE and retry internally.
752+
// The second attempt will return ABORTED and should cause the transaction to
753+
// retry.
754+
try {
755+
return transaction.executeUpdate(UPDATE_STATEMENT);
756+
} catch (AbortedException e) {
757+
// Ignore the AbortedException and let the commit handle it.
758+
}
759+
return 0L;
760+
});
761+
assertThat(value).isEqualTo(UPDATE_COUNT);
762+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
763+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
764+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
765+
}
766+
767+
@Test
768+
public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
769+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
770+
mockSpanner.setExecuteStreamingSqlExecutionTime(
771+
SimulatedExecutionTime.ofExceptions(
772+
Arrays.asList(
773+
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
774+
Long value =
775+
client
776+
.readWriteTransaction()
777+
.run(
778+
transaction -> {
779+
// The first attempt will return UNAVAILABLE and retry internally.
780+
// The second attempt will return ABORTED and should cause the transaction to
781+
// retry.
782+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
783+
if (rs.next()) {
784+
return rs.getLong(0);
785+
}
786+
} catch (AbortedException e) {
787+
// Ignore the AbortedException and let the commit handle it.
788+
}
789+
return 0L;
790+
});
791+
assertThat(value).isEqualTo(1L);
792+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
793+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
794+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
795+
}
796+
797+
@Test
798+
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatch() {
799+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
800+
mockSpanner.setStreamingReadExecutionTime(
801+
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
802+
Long value =
803+
client
804+
.readWriteTransaction()
805+
.run(
806+
transaction -> {
807+
try (ResultSet rs =
808+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
809+
if (rs.next()) {
810+
return rs.getLong(0);
811+
}
812+
} catch (SpannerException e) {
813+
if (e.getErrorCode() == ErrorCode.CANCELLED) {
814+
// Ignore and let the transaction continue.
815+
// Also make sure that the next read operation will return Aborted.
816+
mockSpanner.abortNextTransaction();
817+
} else if (e.getErrorCode() == ErrorCode.ABORTED) {
818+
// Ignore Aborted errors. This will cause the transaction to try to commit.
819+
} else {
820+
// Propagate any other errors (there should not be any in this test case).
821+
throw e;
822+
}
823+
}
824+
return 0L;
825+
});
826+
827+
assertThat(value).isEqualTo(1L);
828+
// 1. The initial attempt will inline the BeginTransaction option.
829+
// 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction
830+
// RPC.
831+
// 3. The ABORTED error during the second attempt will NOT cause the next retry to use an
832+
// explicit BeginTransaction RPC, because the previous attempt did return a transaction ID
833+
// (the ID that was returned by the BeginTransaction RPC of that attempt).
834+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
835+
// There will be 3 attempts to read:
836+
// 1. The first will return CANCELLED.
837+
// 2. The second will return ABORTED.
838+
// 3. The third will return the results.
839+
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
840+
// There are two attempts to commit:
841+
// 1. The initial attempt will NOT try to commit, because the initial Read operation did not
842+
// return a transaction ID.
843+
// 2. The second attempt will try to commit, because the BeginTransaction RPC did return a
844+
// transaction ID, and the Aborted error that was returned by the Read operation was caught
845+
// by the application. This means that the TransactionRunner does not know that the
846+
// transaction was aborted. The Commit RPC will return an Aborted error.
847+
// 3. The third attempt will commit, as the Read operation succeeded and returned a
848+
// transaction ID.
849+
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
850+
}
851+
852+
@Test
853+
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithoutCatch()
854+
throws InterruptedException, ExecutionException {
855+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
856+
mockSpanner.setStreamingReadExecutionTime(
857+
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
858+
// The CANCELLED error is not caught by the application, so it will bubble up and cause the
859+
// transaction to fail.
860+
assertThrows(
861+
SpannerException.class,
862+
() ->
863+
client
864+
.readWriteTransaction()
865+
.run(
866+
transaction -> {
867+
try (ResultSet rs =
868+
transaction.read(
869+
"FOO", KeySet.all(), Collections.singletonList("ID"))) {
870+
if (rs.next()) {
871+
return rs.getLong(0);
872+
}
873+
} catch (SpannerException e) {
874+
if (e.getErrorCode() == ErrorCode.CANCELLED) {
875+
// Make sure that the next read operation will return Aborted.
876+
mockSpanner.abortNextTransaction();
877+
}
878+
// Always propagate the error to the TransactionRunner.
879+
throw e;
880+
}
881+
return 0L;
882+
}));
883+
884+
// The initial attempt will inline the BeginTransaction option.
885+
// There is no second attempt as the CANCELLED error is not caught.
886+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
887+
assertThat(countRequests(ReadRequest.class)).isEqualTo(1);
888+
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
889+
// The CANCELLED error means that there is no transaction ID returned by the Read operation.
890+
// So there is also no transaction to rollback.
891+
assertThat(countRequests(RollbackRequest.class)).isEqualTo(0);
892+
}
893+
894+
@Test
895+
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatchForCancelled()
896+
throws InterruptedException, ExecutionException {
897+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
898+
mockSpanner.setStreamingReadExecutionTime(
899+
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
900+
Long value =
901+
client
902+
.readWriteTransaction()
903+
.run(
904+
transaction -> {
905+
try (ResultSet rs =
906+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
907+
if (rs.next()) {
908+
return rs.getLong(0);
909+
}
910+
} catch (SpannerException e) {
911+
if (e.getErrorCode() == ErrorCode.CANCELLED) {
912+
// Do not propagate the CANCELLED error.
913+
// Make sure that the next read operation will return Aborted.
914+
mockSpanner.abortNextTransaction();
915+
} else {
916+
// Propagate all other errors to the TransactionRunner.
917+
throw e;
918+
}
919+
}
920+
return 0L;
921+
});
922+
923+
assertThat(value).isEqualTo(1L);
924+
// 1. The initial attempt will inline the BeginTransaction option.
925+
// 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction
926+
// RPC, because the error was returned by the first statement in the transaction.
927+
// 3. The ABORTED error during the second attempt will NOT cause the next retry to use an
928+
// explicit BeginTransaction RPC, because the previous attempt did return a transaction ID
929+
// (the ID that was returned by the BeginTransaction RPC of that attempt).
930+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
931+
// There will be 3 attempts to read:
932+
// 1. The first will return CANCELLED.
933+
// 2. The second will return ABORTED.
934+
// 3. The third will return the results.
935+
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
936+
// There is only one attempt to commit:
937+
// 1. The initial attempt will NOT try to commit, because the initial Read operation did not
938+
// return a transaction ID.
939+
// 2. The second attempt will NOT try to commit, because the Aborted error from the Read
940+
// operation is propagated to the TransactionRunner. This means that the TransactionRunner
941+
// knows that the transaction was aborted, and will automatically initiate a retry without
942+
// first trying to commit the transaction.
943+
// 3. The third attempt will commit, as the Read operation succeeded and returned a
944+
// transaction ID.
945+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
946+
}
947+
948+
@Test
949+
public void testInlinedBeginCommitAfterReadReturnsUnavailable() {
950+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
951+
mockSpanner.setCommitExecutionTime(
952+
SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException()));
953+
Long value =
954+
client
955+
.readWriteTransaction()
956+
.run(
957+
transaction -> {
958+
// The first attempt will return UNAVAILABLE and retry internally.
959+
try (ResultSet rs =
960+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
961+
if (rs.next()) {
962+
return rs.getLong(0);
963+
}
964+
}
965+
return 0L;
966+
});
967+
assertThat(value).isEqualTo(1L);
968+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
969+
assertThat(countRequests(ReadRequest.class)).isEqualTo(1);
970+
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
971+
}
972+
973+
@Test
974+
public void testInlinedBeginFirstReadReturnsUnavailableAndCommitAborts() {
975+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
976+
mockSpanner.setStreamingReadExecutionTime(
977+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
978+
final AtomicBoolean firstAttempt = new AtomicBoolean(true);
979+
Long value =
980+
client
981+
.readWriteTransaction()
982+
.run(
983+
transaction -> {
984+
long res = 0L;
985+
// The first attempt will return UNAVAILABLE and retry internally.
986+
try (ResultSet rs =
987+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
988+
if (rs.next()) {
989+
res = rs.getLong(0);
990+
}
991+
}
992+
if (firstAttempt.compareAndSet(true, false)) {
993+
mockSpanner.abortTransaction(transaction);
994+
}
995+
return res;
996+
});
997+
assertThat(value).isEqualTo(1L);
998+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
999+
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
1000+
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
1001+
}
1002+
6281003
@Test
6291004
public void testInlinedBeginTxWithQuery() {
6301005
DatabaseClient client =

0 commit comments

Comments
 (0)