Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,11 @@ CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
new GrpcStreamIterator(
statement,
request.getLastStatement(),
prefetchChunks,
cancelQueryWhenClientIsClosed);
if (streamListener != null) {
stream.registerListener(streamListener);
}
Expand Down Expand Up @@ -935,7 +939,8 @@ String getTransactionTag() {
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
this.session.onError(e);
return e;
}
Expand Down Expand Up @@ -1009,6 +1014,8 @@ ResultSet readInternalWithOptions(
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
final boolean lastStatement =
readOptions.hasLastStatement() ? readOptions.isLastStatement() : false;
ResumableStreamIterator stream =
new ResumableStreamIterator(
MAX_BUFFERED_CHUNKS,
Expand All @@ -1025,7 +1032,8 @@ CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
new GrpcStreamIterator(
lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed);
if (streamListener != null) {
stream.registerListener(streamListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

/** Called when the read finishes with an error. Returns the error that should be thrown. */
SpannerException onError(SpannerException e, boolean withBeginTransaction);
SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean lastStatement);

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);
Expand Down Expand Up @@ -153,6 +154,8 @@ interface CloseableIterator<T> extends Iterator<T> {

boolean isWithBeginTransaction();

boolean isLastStatement();

/**
* @param streamMessageListener A class object which implements StreamMessageListener
* @return true if streaming is supported by the iterator, otherwise false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public boolean next() throws SpannerException {
} catch (Throwable t) {
throw yieldError(
SpannerExceptionFactory.asSpannerException(t),
iterator.isWithBeginTransaction() && currRow == null);
iterator.isWithBeginTransaction() && currRow == null,
iterator.isLastStatement());
}
}

Expand Down Expand Up @@ -149,8 +150,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
SpannerException toThrow = listener.onError(e, beginTransaction);
private SpannerException yieldError(
SpannerException e, boolean beginTransaction, boolean lastStatement) {
SpannerException toThrow = listener.onError(e, beginTransaction, lastStatement);
close();
throw toThrow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,26 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>

private SpannerRpc.StreamingCall call;
private volatile boolean withBeginTransaction;
private final boolean lastStatement;
private TimeUnit streamWaitTimeoutUnit;
private long streamWaitTimeoutValue;
private SpannerException error;
private boolean done;

@VisibleForTesting
GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
this(null, prefetchChunks, cancelQueryWhenClientIsClosed);
GrpcStreamIterator(
boolean lastStatement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
this(null, lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed);
}

@VisibleForTesting
GrpcStreamIterator(
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
Statement statement,
boolean lastStatement,
int prefetchChunks,
boolean cancelQueryWhenClientIsClosed) {
this.statement = statement;
this.lastStatement = lastStatement;
this.prefetchChunks = prefetchChunks;
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
// One extra to allow for END_OF_STREAM message.
Expand Down Expand Up @@ -118,6 +124,11 @@ public boolean isWithBeginTransaction() {
return withBeginTransaction;
}

@Override
public boolean isLastStatement() {
return lastStatement;
}

@Override
protected final PartialResultSet computeNext() {
PartialResultSet next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ boolean isWithBeginTransaction() {
return stream.isWithBeginTransaction();
}

boolean isLastStatement() {
return stream.isLastStatement();
}

/**
* @param a is a mutable list and b will be concatenated into a.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ public boolean isWithBeginTransaction() {
return stream != null && stream.isWithBeginTransaction();
}

@Override
public boolean isLastStatement() {
return stream != null && stream.isLastStatement();
}

@Override
@InternalApi
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,11 @@ public void run() {
span.addAnnotation("Commit Failed", resultException);
opSpan.setStatus(resultException);
opSpan.end();
res.setException(onError(resultException, false));
res.setException(
onError(
resultException,
/* withBeginTransaction= */ false,
/* lastStatement= */ true));
} catch (Throwable unexpectedError) {
// This is a safety precaution to make sure that a result is always returned.
res.setException(unexpectedError);
Expand Down Expand Up @@ -759,8 +763,9 @@ MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
e = super.onError(e, withBeginTransaction);
public SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
e = super.onError(e, withBeginTransaction, lastStatement);

// If the statement that caused an error was the statement that included a BeginTransaction
// option, we simulate an aborted transaction to force a retry of the entire transaction. This
Expand All @@ -770,13 +775,17 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
// statement are included in the transaction, even if the statement again causes an error
// during the retry.
if (withBeginTransaction) {
// Simulate an aborted transaction to force a retry with a new transaction.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED,
"Aborted due to failed initial statement",
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
if (lastStatement) {
this.transactionIdFuture.setException(e);
} else {
// Simulate an aborted transaction to force a retry with a new transaction.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED,
"Aborted due to failed initial statement",
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
}
}
SpannerException exceptionToThrow;
if (withBeginTransaction
Expand Down Expand Up @@ -950,7 +959,9 @@ private ResultSet internalExecuteUpdate(
return resultSet;
} catch (Throwable t) {
throw onError(
SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
SpannerExceptionFactory.asSpannerException(t),
builder.getTransaction().hasBegin(),
builder.getLastStatement());
}
}

Expand Down Expand Up @@ -1008,7 +1019,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
input -> {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
SpannerException exceptionToThrow =
onError(e, builder.getTransaction().hasBegin());
onError(e, builder.getTransaction().hasBegin(), builder.getLastStatement());
span.setStatus(exceptionToThrow);
throw exceptionToThrow;
},
Expand Down Expand Up @@ -1099,7 +1110,9 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
return results;
} catch (Throwable e) {
throw onError(
SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
SpannerExceptionFactory.asSpannerException(e),
builder.getTransaction().hasBegin(),
builder.getLastStatements());
}
} catch (Throwable throwable) {
span.setStatus(throwable);
Expand Down Expand Up @@ -1178,7 +1191,7 @@ public ApiFuture<long[]> batchUpdateAsync(
input -> {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
SpannerException exceptionToThrow =
onError(e, builder.getTransaction().hasBegin());
onError(e, builder.getTransaction().hasBegin(), builder.getLastStatements());
span.setStatus(exceptionToThrow);
throw exceptionToThrow;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
return e;
}

Expand All @@ -88,7 +89,9 @@ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}

@Before
public void setUp() {
stream = new GrpcStreamIterator(10, /* cancelQueryWhenClientIsClosed= */ false);
stream =
new GrpcStreamIterator(
/* lastStatement= */ false, 10, /* cancelQueryWhenClientIsClosed= */ false);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
return e;
}

Expand Down Expand Up @@ -118,7 +119,9 @@ private static class TestCaseRunner {
}

private void run() throws Exception {
stream = new GrpcStreamIterator(10, /* cancelQueryWhenClientIsClosed= */ false);
stream =
new GrpcStreamIterator(
/* lastStatement= */ false, 10, /* cancelQueryWhenClientIsClosed= */ false);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public boolean isWithBeginTransaction() {
return false;
}

@Override
public boolean isLastStatement() {
return false;
}

@Override
public boolean hasNext() {
return first || iterator.hasNext();
Expand Down Expand Up @@ -77,7 +82,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean isLastStatement) {
return e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public void close(@Nullable String message) {
public boolean isWithBeginTransaction() {
return false;
}

@Override
public boolean isLastStatement() {
return false;
}
}

Starter starter = Mockito.mock(Starter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@
import static com.google.cloud.spanner.connection.ConnectionProperties.READ_LOCK_MODE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -106,6 +112,34 @@ public void testDml() {
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
}

@Test
public void testDmlFailed() {
Statement invalidInsert = Statement.of("insert into my_table (id, name) values (1, 'test')");
mockSpanner.putStatementResult(
MockSpannerServiceImpl.StatementResult.exception(
invalidInsert,
Status.ALREADY_EXISTS.withDescription("Row 1 already exists").asRuntimeException()));

try (Connection connection = createConnection()) {
connection.setAutocommit(true);
SpannerException exception =
assertThrows(SpannerException.class, () -> connection.executeUpdate(invalidInsert));
assertEquals(ErrorCode.ALREADY_EXISTS, exception.getErrorCode());
}
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
ExecuteSqlRequest request = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0);
assertTrue(request.getTransaction().hasBegin());
assertTrue(request.getTransaction().getBegin().hasReadWrite());
assertEquals(isolationLevel, request.getTransaction().getBegin().getIsolationLevel());
assertEquals(
readLockMode, request.getTransaction().getBegin().getReadWrite().getReadLockMode());
assertTrue(request.getLastStatement());
// There should be no rollback request on the server, as there was no transaction ID returned
// to the client.
assertEquals(0, mockSpanner.countRequestsOfType(RollbackRequest.class));
}

@Test
public void testDmlReturning() {
try (Connection connection = createConnection()) {
Expand Down
Loading