Skip to content

Commit a346eb3

Browse files
fix(3.1.x): UNAVAILABLE error on first query could cause transaction to get stuck (#856)
* fix: UNAVAILABLE error on first query could cause transaction to get stuck (#807) If the first query or read operation of a read/write transaction would return UNAVAILABLE for the first element of the result stream, the transaction could get stuck. This was caused by the internal retry mechanism that would wait for the initial attempt to return a transaction, which was never returned as the UNAVAILABLE exception was internally handled by the result stream iterator. Fixes #799 * chore: re-formats source files To fix lint errors * fix: removes unrelated changes Co-authored-by: Knut Olav Løite <[email protected]>
1 parent 8626a85 commit a346eb3

File tree

4 files changed

+153
-13
lines changed

4 files changed

+153
-13
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
554554
return builder.build();
555555
}
556556

557-
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
557+
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
558+
Statement statement, QueryMode queryMode, boolean withTransactionSelector) {
558559
ExecuteSqlRequest.Builder builder =
559560
ExecuteSqlRequest.newBuilder()
560561
.setSql(statement.getSql())
@@ -568,9 +569,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
568569
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
569570
}
570571
}
571-
TransactionSelector selector = getTransactionSelector();
572-
if (selector != null) {
573-
builder.setTransaction(selector);
572+
if (withTransactionSelector) {
573+
TransactionSelector selector = getTransactionSelector();
574+
if (selector != null) {
575+
builder.setTransaction(selector);
576+
}
574577
}
575578
builder.setSeqno(getSeqNo());
576579
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
@@ -614,18 +617,25 @@ ResultSet executeQueryInternalWithOptions(
614617
beforeReadOrQuery();
615618
final int prefetchChunks =
616619
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
620+
final ExecuteSqlRequest.Builder request =
621+
getExecuteSqlRequestBuilder(statement, queryMode, /* withTransactionSelector = */ false);
617622
ResumableStreamIterator stream =
618623
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
619624
@Override
620625
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
621626
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
622-
final ExecuteSqlRequest.Builder request =
623-
getExecuteSqlRequestBuilder(statement, queryMode);
624627
if (partitionToken != null) {
625628
request.setPartitionToken(partitionToken);
626629
}
630+
TransactionSelector selector = null;
627631
if (resumeToken != null) {
628632
request.setResumeToken(resumeToken);
633+
selector = getTransactionSelector();
634+
} else if (!request.hasTransaction()) {
635+
selector = getTransactionSelector();
636+
}
637+
if (selector != null) {
638+
request.setTransaction(selector);
629639
}
630640
SpannerRpc.StreamingCall call =
631641
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
@@ -733,10 +743,13 @@ ResultSet readInternalWithOptions(
733743
@Override
734744
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
735745
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
746+
TransactionSelector selector = null;
736747
if (resumeToken != null) {
737748
builder.setResumeToken(resumeToken);
749+
selector = getTransactionSelector();
750+
} else if (!builder.hasTransaction()) {
751+
selector = getTransactionSelector();
738752
}
739-
TransactionSelector selector = getTransactionSelector();
740753
if (selector != null) {
741754
builder.setTransaction(selector);
742755
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,8 @@ public void buffer(Iterable<Mutation> mutations) {
515515
public long executeUpdate(Statement statement) {
516516
beforeReadOrQuery();
517517
final ExecuteSqlRequest.Builder builder =
518-
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
518+
getExecuteSqlRequestBuilder(
519+
statement, QueryMode.NORMAL, /* withTransactionSelector = */ true);
519520
try {
520521
com.google.spanner.v1.ResultSet resultSet =
521522
rpc.executeQuery(builder.build(), session.getOptions());
@@ -538,7 +539,8 @@ public long executeUpdate(Statement statement) {
538539
public ApiFuture<Long> executeUpdateAsync(Statement statement) {
539540
beforeReadOrQuery();
540541
final ExecuteSqlRequest.Builder builder =
541-
getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL);
542+
getExecuteSqlRequestBuilder(
543+
statement, QueryMode.NORMAL, /* withTransactionSelector = */ true);
542544
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
543545
try {
544546
// Register the update as an async operation that must finish before the transaction may

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ public void setup() {
8989
public void executeSqlRequestBuilderWithoutQueryOptions() {
9090
ExecuteSqlRequest request =
9191
context
92-
.getExecuteSqlRequestBuilder(Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL)
92+
.getExecuteSqlRequestBuilder(
93+
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, true)
9394
.build();
9495
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
9596
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
@@ -103,7 +104,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
103104
Statement.newBuilder("SELECT FOO FROM BAR")
104105
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
105106
.build(),
106-
QueryMode.NORMAL)
107+
QueryMode.NORMAL,
108+
true)
107109
.build();
108110
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
109111
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,130 @@ public Long run(TransactionContext transaction) throws Exception {
251251
assertThat(countTransactionsStarted()).isEqualTo(2);
252252
}
253253

254+
@Test
255+
public void testInlinedBeginFirstUpdateAborts() {
256+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
257+
long updateCount =
258+
client
259+
.readWriteTransaction()
260+
.run(
261+
new TransactionCallable<Long>() {
262+
boolean firstAttempt = true;
263+
264+
@Override
265+
public Long run(TransactionContext transaction) throws Exception {
266+
if (firstAttempt) {
267+
firstAttempt = false;
268+
mockSpanner.putStatementResult(
269+
StatementResult.exception(
270+
UPDATE_STATEMENT,
271+
mockSpanner.createAbortedException(
272+
ByteString.copyFromUtf8("some-tx"))));
273+
} else {
274+
mockSpanner.putStatementResult(
275+
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
276+
}
277+
return transaction.executeUpdate(UPDATE_STATEMENT);
278+
}
279+
});
280+
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
281+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
282+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
283+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
284+
}
285+
286+
@Test
287+
public void testInlinedBeginFirstQueryAborts() {
288+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
289+
long updateCount =
290+
client
291+
.readWriteTransaction()
292+
.run(
293+
new TransactionCallable<Long>() {
294+
boolean firstAttempt = true;
295+
296+
@Override
297+
public Long run(TransactionContext transaction) throws Exception {
298+
if (firstAttempt) {
299+
firstAttempt = false;
300+
mockSpanner.putStatementResult(
301+
StatementResult.exception(
302+
SELECT1,
303+
mockSpanner.createAbortedException(
304+
ByteString.copyFromUtf8("some-tx"))));
305+
} else {
306+
mockSpanner.putStatementResult(
307+
StatementResult.query(SELECT1, SELECT1_RESULTSET));
308+
}
309+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
310+
while (rs.next()) {
311+
return rs.getLong(0);
312+
}
313+
}
314+
return 0L;
315+
}
316+
});
317+
assertThat(updateCount).isEqualTo(1L);
318+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
319+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
320+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
321+
}
322+
323+
@Test
324+
public void testInlinedBeginFirstQueryReturnsUnavailable() {
325+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
326+
mockSpanner.setExecuteStreamingSqlExecutionTime(
327+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
328+
long value =
329+
client
330+
.readWriteTransaction()
331+
.run(
332+
new TransactionCallable<Long>() {
333+
@Override
334+
public Long run(TransactionContext transaction) throws Exception {
335+
// The first attempt will return UNAVAILABLE and retry internally.
336+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
337+
while (rs.next()) {
338+
return rs.getLong(0);
339+
}
340+
}
341+
return 0L;
342+
}
343+
});
344+
assertThat(value).isEqualTo(1L);
345+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
346+
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
347+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
348+
}
349+
350+
@Test
351+
public void testInlinedBeginFirstReadReturnsUnavailable() {
352+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
353+
mockSpanner.setStreamingReadExecutionTime(
354+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
355+
long value =
356+
client
357+
.readWriteTransaction()
358+
.run(
359+
new TransactionCallable<Long>() {
360+
@Override
361+
public Long run(TransactionContext transaction) throws Exception {
362+
// The first attempt will return UNAVAILABLE and retry internally.
363+
try (ResultSet rs =
364+
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
365+
while (rs.next()) {
366+
return rs.getLong(0);
367+
}
368+
}
369+
return 0L;
370+
}
371+
});
372+
assertThat(value).isEqualTo(1L);
373+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
374+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
375+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
376+
}
377+
254378
@Test
255379
public void testInlinedBeginTxWithQuery() {
256380
DatabaseClient client =
@@ -279,8 +403,7 @@ public Long run(TransactionContext transaction) throws Exception {
279403

280404
@Test
281405
public void testInlinedBeginTxWithRead() {
282-
DatabaseClient client =
283-
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
406+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
284407
long updateCount =
285408
client
286409
.readWriteTransaction()

0 commit comments

Comments
 (0)