Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

/**
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
* present in the RPC response. In such cases, this method will be a no-op.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}

private ResultSet readInternal(
String table,
@Nullable String index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);

/**
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
* will be included if the read-write transaction is executed on a multiplexed session.
*/
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
}

static final class LazyByteArray implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
Expand All @@ -44,9 +45,11 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
private final Listener listener;

GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
this.stream = stream;
this.listener = listener;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
// collect the precommit token from each PartialResultSet
if (current.hasPrecommitToken()) {
listener.onPrecommitToken(current.getPrecommitToken());
}
if (current.hasStats()) {
statistics = current.getStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -171,6 +172,11 @@ public void removeListener(Runnable listener) {
@GuardedBy("committingLock")
private volatile boolean committing;

private final Object precommitTokenLock = new Object();

@GuardedBy("precommitTokenLock")
private MultiplexedSessionPrecommitToken latestPrecommitToken;

@GuardedBy("lock")
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();

Expand Down Expand Up @@ -423,6 +429,10 @@ public void run() {
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) {
// Set the precommit token in the CommitRequest for multiplexed sessions.
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
Expand Down Expand Up @@ -625,6 +635,24 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

/**
* In read-write transactions, the precommit token with the highest sequence number from this
* transaction attempt will be tracked and included in the
* [Commit][google.spanner.v1.Spanner.Commit] request for the transaction.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {
if (token == null) return;
synchronized (precommitTokenLock) {
if (this.latestPrecommitToken == null
|| token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) {
this.latestPrecommitToken = token;
System.out.println("Updating precommit token to " + this.latestPrecommitToken);
txnLogger.log(Level.ALL, "Updating precommit token to " + this.latestPrecommitToken);
}
}
}

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) {
Expand All @@ -633,6 +661,13 @@ String getTransactionTag() {
return null;
}

@Nullable
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
synchronized (precommitTokenLock) {
return this.latestPrecommitToken;
}
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
e = super.onError(e, withBeginTransaction);
Expand Down Expand Up @@ -811,6 +846,9 @@ private ResultSet internalExecuteUpdate(
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
if (resultSet.hasPrecommitToken()) {
onPrecommitToken(resultSet.getPrecommitToken());
}
return resultSet;
} catch (Throwable t) {
throw onError(
Expand Down Expand Up @@ -885,6 +923,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
if (resultSet.get().hasPrecommitToken()) {
onPrecommitToken(resultSet.get().getPrecommitToken());
}
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
Expand Down Expand Up @@ -940,6 +981,11 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
}
}

// TODO(sriharshach): check if we need to get precommit_token from response.getResultSets
if (response.hasPrecommitToken()) {
onPrecommitToken(response.getPrecommitToken());
}

// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down Expand Up @@ -1004,6 +1050,9 @@ public ApiFuture<long[]> batchUpdateAsync(
builder.getTransaction().hasBegin());
}
}
if (batchDmlResponse.hasPrecommitToken()) {
onPrecommitToken(batchDmlResponse.getPrecommitToken());
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.QueryPlan;
import com.google.spanner.v1.ResultSetMetadata;
Expand Down Expand Up @@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction

@Override
public void onDone(boolean withBeginTransaction) {}

@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.google.spanner.v1.GetSessionRequest;
import com.google.spanner.v1.ListSessionsRequest;
import com.google.spanner.v1.ListSessionsResponse;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.Partition;
import com.google.spanner.v1.PartitionOptions;
Expand Down Expand Up @@ -197,10 +198,12 @@ private static class PartialResultSetsIterator implements Iterator<PartialResult
private boolean hasNext;
private boolean first = true;
private int currentRow = 0;
private boolean isMultiplexedSession = false;

private PartialResultSetsIterator(ResultSet resultSet) {
private PartialResultSetsIterator(ResultSet resultSet, boolean isMultiplexedSession) {
this.resultSet = resultSet;
this.hasNext = true;
this.isMultiplexedSession = isMultiplexedSession;
}

@Override
Expand All @@ -227,6 +230,9 @@ public PartialResultSet next() {
}
builder.setResumeToken(ByteString.copyFromUtf8(String.format("%09d", currentRow)));
hasNext = currentRow < resultSet.getRowsCount();
if (this.isMultiplexedSession) {
builder.setPrecommitToken(getPartialResultSetPrecommitToken());
}
return builder.build();
}

Expand Down Expand Up @@ -1020,7 +1026,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
throw result.getException();
case RESULT_SET:
returnResultSet(
result.getResultSet(), transactionId, request.getTransaction(), responseObserver);
result.getResultSet(),
transactionId,
request.getTransaction(),
responseObserver,
session);
break;
case UPDATE_COUNT:
if (isPartitionedDmlTransaction(transactionId)) {
Expand All @@ -1033,7 +1043,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
.build())
.build());
} else {
responseObserver.onNext(
ResultSet.Builder resultSetBuilder =
ResultSet.newBuilder()
.setStats(
ResultSetStats.newBuilder()
Expand All @@ -1045,8 +1055,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
ignoreNextInlineBeginRequest.getAndSet(false)
? Transaction.getDefaultInstance()
: Transaction.newBuilder().setId(transactionId).build())
.build())
.build());
.build());
if (session.getMultiplexed()) {
resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken());
}
responseObserver.onNext(resultSetBuilder.build());
}
break;
default:
Expand All @@ -1064,7 +1077,8 @@ private void returnResultSet(
ResultSet resultSet,
ByteString transactionId,
TransactionSelector transactionSelector,
StreamObserver<ResultSet> responseObserver) {
StreamObserver<ResultSet> responseObserver,
Session session) {
ResultSetMetadata metadata = resultSet.getMetadata();
if (transactionId != null) {
metadata =
Expand All @@ -1080,6 +1094,9 @@ private void returnResultSet(
metadata = metadata.toBuilder().setTransaction(transaction).build();
}
resultSet = resultSet.toBuilder().setMetadata(metadata).build();
if (session.getMultiplexed()) {
resultSet = resultSet.toBuilder().setPrecommitToken(getResultSetPrecommitToken()).build();
}
responseObserver.onNext(resultSet);
}

Expand Down Expand Up @@ -1174,6 +1191,9 @@ public void executeBatchDml(
.build());
}
builder.setStatus(status);
if (session.getMultiplexed()) {
builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken());
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
Expand Down Expand Up @@ -1242,7 +1262,8 @@ public void executeStreamingSql(
transactionId,
request.getTransaction(),
responseObserver,
getExecuteStreamingSqlExecutionTime());
getExecuteStreamingSqlExecutionTime(),
session.getMultiplexed());
break;
case UPDATE_COUNT:
if (isPartitioned) {
Expand Down Expand Up @@ -1612,7 +1633,7 @@ public void read(final ReadRequest request, StreamObserver<ResultSet> responseOb
cols);
StatementResult res = getResult(statement);
returnResultSet(
res.getResultSet(), transactionId, request.getTransaction(), responseObserver);
res.getResultSet(), transactionId, request.getTransaction(), responseObserver, session);
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
Expand Down Expand Up @@ -1670,7 +1691,8 @@ public void streamingRead(
transactionId,
request.getTransaction(),
responseObserver,
getStreamingReadExecutionTime());
getStreamingReadExecutionTime(),
session.getMultiplexed());
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
} catch (Throwable t) {
Expand All @@ -1683,7 +1705,8 @@ private void returnPartialResultSet(
ByteString transactionId,
TransactionSelector transactionSelector,
StreamObserver<PartialResultSet> responseObserver,
SimulatedExecutionTime executionTime)
SimulatedExecutionTime executionTime,
boolean isMultiplexedSession)
throws Exception {
ResultSetMetadata metadata = resultSet.getMetadata();
if (transactionId == null) {
Expand All @@ -1700,7 +1723,8 @@ private void returnPartialResultSet(
.build();
}
resultSet = resultSet.toBuilder().setMetadata(metadata).build();
PartialResultSetsIterator iterator = new PartialResultSetsIterator(resultSet);
PartialResultSetsIterator iterator =
new PartialResultSetsIterator(resultSet, isMultiplexedSession);
long index = 0L;
while (iterator.hasNext()) {
SimulatedExecutionTime.checkStreamException(
Expand Down Expand Up @@ -2447,4 +2471,23 @@ Session getSession(String name) {
}
return null;
}

static MultiplexedSessionPrecommitToken getResultSetPrecommitToken() {
return getPrecommitToken("ResultSetPrecommitToken", 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence numbers should be more intelligent than this. This just assumes that RPCs will be called in a given order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for hardcoding the sequence number is to maintain greater control in our tests and to minimize flakiness. This approach does not assume that RPCs will be called in a strict order. For instance, consider the following RPC response order in a read-write transaction:

ResultSet -> sequence number 1
PartialResultSet -> sequence number 3
ExecuteBatchDml -> sequence number 2

Even though the ExecuteBatchDml RPC was executed later, we know that the PartialResultSet, with sequence number 3, will have the latest precommit token. This ensures that we are properly validating the sequence number logic

However, I agree that there is a flaw in this approach. If the same order of RPCs is executed on the backend, the ExecuteBatchDml would have the latest sequence number since it was the last RPC executed in the transaction. In contrast, our mock Spanner does not adhere to this behavior, which causes the discrepancy.

Other options.

  1. Incremental counter - We can use AtomicInteger to generate incrementing sequence numbers in a thread-safe manner. This ensures that the seq number is assigned based on when the request reaches mock spanner. This also ensure seq number is always incremental and will not lead to flakiness.
  2. Time-Based Increments - We can use Instant.now() as sequence number. However the seq number in this case will not always be incremental (2 RPCs can have same seq number) leading to test flakiness.

I have modified code to use incremental counter.

However, please let me know if there are other better ways to solve this.

}

static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken() {
return getPrecommitToken("PartialResultSetPrecommitToken", 3);
}

static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken() {
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", 2);
}

static MultiplexedSessionPrecommitToken getPrecommitToken(String value, int seqNo) {
return MultiplexedSessionPrecommitToken.newBuilder()
.setPrecommitToken(ByteString.copyFromUtf8(value))
.setSeqNum(seqNo)
.build();
}
}
Loading
Loading