diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 6816001a9f..ffbdfc1cfd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -818,7 +818,11 @@ CloseableIterator startStream( @Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamListener) { GrpcStreamIterator stream = - new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed); + new GrpcStreamIterator( + statement, + request.getLastStatement(), + prefetchChunks, + cancelQueryWhenClientIsClosed); if (streamListener != null) { stream.registerListener(streamListener); } @@ -935,7 +939,8 @@ String getTransactionTag() { public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {} @Override - public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + public SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean lastStatement) { this.session.onError(e); return e; } @@ -1009,6 +1014,8 @@ ResultSet readInternalWithOptions( } final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; + final boolean lastStatement = + readOptions.hasLastStatement() ? readOptions.isLastStatement() : false; ResumableStreamIterator stream = new ResumableStreamIterator( MAX_BUFFERED_CHUNKS, @@ -1025,7 +1032,8 @@ CloseableIterator startStream( @Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamListener) { GrpcStreamIterator stream = - new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed); + new GrpcStreamIterator( + lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed); if (streamListener != null) { stream.registerListener(streamListener); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index fc3a5609bb..0717cae74f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -56,7 +56,8 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) throws SpannerException; /** Called when the read finishes with an error. Returns the error that should be thrown. */ - SpannerException onError(SpannerException e, boolean withBeginTransaction); + SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean lastStatement); /** Called when the read finishes normally. */ void onDone(boolean withBeginTransaction); @@ -153,6 +154,8 @@ interface CloseableIterator extends Iterator { boolean isWithBeginTransaction(); + boolean isLastStatement(); + /** * @param streamMessageListener A class object which implements StreamMessageListener * @return true if streaming is supported by the iterator, otherwise false diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index c2a4ee5a58..c013c48800 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -109,7 +109,8 @@ public boolean next() throws SpannerException { } catch (Throwable t) { throw yieldError( SpannerExceptionFactory.asSpannerException(t), - iterator.isWithBeginTransaction() && currRow == null); + iterator.isWithBeginTransaction() && currRow == null, + iterator.isLastStatement()); } } @@ -149,8 +150,9 @@ public Type getType() { return currRow.getType(); } - private SpannerException yieldError(SpannerException e, boolean beginTransaction) { - SpannerException toThrow = listener.onError(e, beginTransaction); + private SpannerException yieldError( + SpannerException e, boolean beginTransaction, boolean lastStatement) { + SpannerException toThrow = listener.onError(e, beginTransaction, lastStatement); close(); throw toThrow; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index 46107de897..371accb7ee 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -49,20 +49,26 @@ class GrpcStreamIterator extends AbstractIterator private SpannerRpc.StreamingCall call; private volatile boolean withBeginTransaction; + private final boolean lastStatement; private TimeUnit streamWaitTimeoutUnit; private long streamWaitTimeoutValue; private SpannerException error; private boolean done; @VisibleForTesting - GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { - this(null, prefetchChunks, cancelQueryWhenClientIsClosed); + GrpcStreamIterator( + boolean lastStatement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { + this(null, lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed); } @VisibleForTesting GrpcStreamIterator( - Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { + Statement statement, + boolean lastStatement, + int prefetchChunks, + boolean cancelQueryWhenClientIsClosed) { this.statement = statement; + this.lastStatement = lastStatement; this.prefetchChunks = prefetchChunks; this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed); // One extra to allow for END_OF_STREAM message. @@ -118,6 +124,11 @@ public boolean isWithBeginTransaction() { return withBeginTransaction; } + @Override + public boolean isLastStatement() { + return lastStatement; + } + @Override protected final PartialResultSet computeNext() { PartialResultSet next; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java index 5d1a884a9d..09b850c93f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java @@ -183,6 +183,10 @@ boolean isWithBeginTransaction() { return stream.isWithBeginTransaction(); } + boolean isLastStatement() { + return stream.isLastStatement(); + } + /** * @param a is a mutable list and b will be concatenated into a. */ diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 1240dd631a..d292e53fe5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -242,6 +242,11 @@ public boolean isWithBeginTransaction() { return stream != null && stream.isWithBeginTransaction(); } + @Override + public boolean isLastStatement() { + return stream != null && stream.isLastStatement(); + } + @Override @InternalApi public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 8af9ba65d2..354d8a7151 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -559,7 +559,11 @@ public void run() { span.addAnnotation("Commit Failed", resultException); opSpan.setStatus(resultException); opSpan.end(); - res.setException(onError(resultException, false)); + res.setException( + onError( + resultException, + /* withBeginTransaction= */ false, + /* lastStatement= */ true)); } catch (Throwable unexpectedError) { // This is a safety precaution to make sure that a result is always returned. res.setException(unexpectedError); @@ -759,8 +763,9 @@ MultiplexedSessionPrecommitToken getLatestPrecommitToken() { } @Override - public SpannerException onError(SpannerException e, boolean withBeginTransaction) { - e = super.onError(e, withBeginTransaction); + public SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean lastStatement) { + e = super.onError(e, withBeginTransaction, lastStatement); // If the statement that caused an error was the statement that included a BeginTransaction // option, we simulate an aborted transaction to force a retry of the entire transaction. This @@ -770,13 +775,17 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction // statement are included in the transaction, even if the statement again causes an error // during the retry. if (withBeginTransaction) { - // Simulate an aborted transaction to force a retry with a new transaction. - this.transactionIdFuture.setException( - SpannerExceptionFactory.newSpannerException( - ErrorCode.ABORTED, - "Aborted due to failed initial statement", - SpannerExceptionFactory.createAbortedExceptionWithRetryDelay( - "Aborted due to failed initial statement", e, 0, 1))); + if (lastStatement) { + this.transactionIdFuture.setException(e); + } else { + // Simulate an aborted transaction to force a retry with a new transaction. + this.transactionIdFuture.setException( + SpannerExceptionFactory.newSpannerException( + ErrorCode.ABORTED, + "Aborted due to failed initial statement", + SpannerExceptionFactory.createAbortedExceptionWithRetryDelay( + "Aborted due to failed initial statement", e, 0, 1))); + } } SpannerException exceptionToThrow; if (withBeginTransaction @@ -950,7 +959,9 @@ private ResultSet internalExecuteUpdate( return resultSet; } catch (Throwable t) { throw onError( - SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin()); + SpannerExceptionFactory.asSpannerException(t), + builder.getTransaction().hasBegin(), + builder.getLastStatement()); } } @@ -1008,7 +1019,7 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... u input -> { SpannerException e = SpannerExceptionFactory.asSpannerException(input); SpannerException exceptionToThrow = - onError(e, builder.getTransaction().hasBegin()); + onError(e, builder.getTransaction().hasBegin(), builder.getLastStatement()); span.setStatus(exceptionToThrow); throw exceptionToThrow; }, @@ -1099,7 +1110,9 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update return results; } catch (Throwable e) { throw onError( - SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin()); + SpannerExceptionFactory.asSpannerException(e), + builder.getTransaction().hasBegin(), + builder.getLastStatements()); } } catch (Throwable throwable) { span.setStatus(throwable); @@ -1178,7 +1191,7 @@ public ApiFuture batchUpdateAsync( input -> { SpannerException e = SpannerExceptionFactory.asSpannerException(input); SpannerException exceptionToThrow = - onError(e, builder.getTransaction().hasBegin()); + onError(e, builder.getTransaction().hasBegin(), builder.getLastStatements()); span.setStatus(exceptionToThrow); throw exceptionToThrow; }, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 0d46dea3a3..4007c972c2 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -75,7 +75,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + public SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean lastStatement) { return e; } @@ -88,7 +89,9 @@ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} @Before public void setUp() { - stream = new GrpcStreamIterator(10, /* cancelQueryWhenClientIsClosed= */ false); + stream = + new GrpcStreamIterator( + /* lastStatement= */ false, 10, /* cancelQueryWhenClientIsClosed= */ false); stream.setCall( new SpannerRpc.StreamingCall() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index 588ed64e88..ff26f774b4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -51,7 +51,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + public SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean lastStatement) { return e; } @@ -118,7 +119,9 @@ private static class TestCaseRunner { } private void run() throws Exception { - stream = new GrpcStreamIterator(10, /* cancelQueryWhenClientIsClosed= */ false); + stream = + new GrpcStreamIterator( + /* lastStatement= */ false, 10, /* cancelQueryWhenClientIsClosed= */ false); stream.setCall( new SpannerRpc.StreamingCall() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java index 404973336b..4ab506f73b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java @@ -47,6 +47,11 @@ public boolean isWithBeginTransaction() { return false; } + @Override + public boolean isLastStatement() { + return false; + } + @Override public boolean hasNext() { return first || iterator.hasNext(); @@ -77,7 +82,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + public SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean isLastStatement) { return e; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index 5588b47866..14f8df04e9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -140,6 +140,11 @@ public void close(@Nullable String message) { public boolean isWithBeginTransaction() { return false; } + + @Override + public boolean isLastStatement() { + return false; + } } Starter starter = Mockito.mock(Starter.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java index 7d9e9a684f..961d90e14d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AutoCommitMockServerTest.java @@ -20,17 +20,23 @@ import static com.google.cloud.spanner.connection.ConnectionProperties.READ_LOCK_MODE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.MockSpannerServiceImpl; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.TransactionOptions.IsolationLevel; import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; +import io.grpc.Status; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -106,6 +112,34 @@ public void testDml() { assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); } + @Test + public void testDmlFailed() { + Statement invalidInsert = Statement.of("insert into my_table (id, name) values (1, 'test')"); + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.exception( + invalidInsert, + Status.ALREADY_EXISTS.withDescription("Row 1 already exists").asRuntimeException())); + + try (Connection connection = createConnection()) { + connection.setAutocommit(true); + SpannerException exception = + assertThrows(SpannerException.class, () -> connection.executeUpdate(invalidInsert)); + assertEquals(ErrorCode.ALREADY_EXISTS, exception.getErrorCode()); + } + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0); + assertTrue(request.getTransaction().hasBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel()); + assertEquals( + readLockMode, request.getTransaction().getBegin().getReadWrite().getReadLockMode()); + assertTrue(request.getLastStatement()); + // There should be no rollback request on the server, as there was no transaction ID returned + // to the client. + assertEquals(0, mockSpanner.countRequestsOfType(RollbackRequest.class)); + } + @Test public void testDmlReturning() { try (Connection connection = createConnection()) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java index 1c78f7031e..31cc1acd19 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/TransactionMockServerTest.java @@ -26,16 +26,19 @@ import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.MockSpannerServiceImpl; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; +import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.TransactionOptions.IsolationLevel; import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; import io.grpc.Deadline.Ticker; +import io.grpc.Status; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -113,6 +116,54 @@ public void testDml() { assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); } + @Test + public void testFailedFirstDml() { + Statement invalidInsert = Statement.of("insert into my_table (id, name) values (1, 'test')"); + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.exception( + invalidInsert, + Status.ALREADY_EXISTS.withDescription("Row 1 already exists").asRuntimeException())); + + try (Connection connection = createConnection()) { + SpannerException exception = + assertThrows(SpannerException.class, () -> connection.executeUpdate(invalidInsert)); + assertEquals(ErrorCode.ALREADY_EXISTS, exception.getErrorCode()); + connection.commit(); + } + // The transaction should be internally retried with an explicit BeginTransaction request, as + // the first statement in the transaction failed. + assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + + @Test + public void testFailedFirstAndLastDml() { + Statement invalidInsert = + Statement.of("insert into my_table (id, name) values (1, 'test') then return id"); + mockSpanner.putStatementResult( + MockSpannerServiceImpl.StatementResult.exception( + invalidInsert, + Status.ALREADY_EXISTS.withDescription("Row 1 already exists").asRuntimeException())); + + try (Connection connection = createConnection()) { + SpannerException exception = + assertThrows( + SpannerException.class, + () -> connection.executeQuery(invalidInsert, Options.lastStatement())); + assertEquals(ErrorCode.ALREADY_EXISTS, exception.getErrorCode()); + + // The same error should be repeated for the commit. + exception = assertThrows(SpannerException.class, connection::commit); + assertEquals(ErrorCode.ALREADY_EXISTS, exception.getErrorCode()); + } + // The transaction should be not be retried, as the last_statement flag was set. + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + // There is no CommitRequest, because the statement never returned a transaction ID. + assertEquals(0, mockSpanner.countRequestsOfType(CommitRequest.class)); + } + @Test public void testDmlReturning() { try (Connection connection = createConnection()) {