Skip to content

Commit aeeea3c

Browse files
authored
chore(spanner): track precommit token for R/W multiplexed session (#3411)
When a read-write transaction is executed on a multiplexed session, the RPC responses of that transaction return a `MultiplexedSessionPrecommitToken`. In client library, the precommit token with the highest sequence number is tracked at the transaction context level. During the commit, this latest precommit token is fetched and set in the CommitRequest. If the precommit token is not set during the commit, the backend will throw an `INVALID_ARGUMENT` error. Including the latest token in the CommitRequest is essential to prevent latency regression, though it does not impact the correctness of the transaction. This PR tracks the precommit token from the following RPC responses, 1. ResultSet 2. PartialResultSet 3. ExecuteBatchDmlResponse
1 parent 1e8b82c commit aeeea3c

File tree

10 files changed

+322
-14
lines changed

10 files changed

+322
-14
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.google.spanner.v1.ExecuteSqlRequest;
4949
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
5050
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
51+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
5152
import com.google.spanner.v1.PartialResultSet;
5253
import com.google.spanner.v1.ReadRequest;
5354
import com.google.spanner.v1.RequestOptions;
@@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
893894
this.session.onReadDone();
894895
}
895896

897+
/**
898+
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
899+
* present in the RPC response. In such cases, this method will be a no-op.
900+
*/
901+
@Override
902+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
903+
896904
private ResultSet readInternal(
897905
String table,
898906
@Nullable String index,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.protobuf.ListValue;
2828
import com.google.protobuf.ProtocolMessageEnum;
2929
import com.google.protobuf.Value.KindCase;
30+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3031
import com.google.spanner.v1.Transaction;
3132
import java.io.IOException;
3233
import java.io.Serializable;
@@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
5758

5859
/** Called when the read finishes normally. */
5960
void onDone(boolean withBeginTransaction);
61+
62+
/**
63+
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
64+
* will be included if the read-write transaction is executed on a multiplexed session.
65+
*/
66+
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
6067
}
6168

6269
static final class LazyByteArray implements Serializable {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR
4747

4848
GrpcResultSet(
4949
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
50-
this.iterator = new GrpcValueIterator(iterator);
50+
this.iterator = new GrpcValueIterator(iterator, listener);
5151
this.listener = listener;
5252
this.decodeMode = decodeMode;
5353
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.common.base.Preconditions.checkState;
2121

2222
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
23+
import com.google.cloud.spanner.AbstractResultSet.Listener;
2324
import com.google.common.collect.AbstractIterator;
2425
import com.google.protobuf.ListValue;
2526
import com.google.protobuf.Value.KindCase;
@@ -44,9 +45,11 @@ private enum StreamValue {
4445
private PartialResultSet current;
4546
private int pos;
4647
private ResultSetStats statistics;
48+
private final Listener listener;
4749

48-
GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
50+
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
4951
this.stream = stream;
52+
this.listener = listener;
5053
}
5154

5255
@SuppressWarnings("unchecked")
@@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
154157
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
155158
}
156159
}
160+
// collect the precommit token from each PartialResultSet
161+
if (current.hasPrecommitToken()) {
162+
listener.onPrecommitToken(current.getPrecommitToken());
163+
}
157164
if (current.hasStats()) {
158165
statistics = current.getStats();
159166
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.spanner.v1.ExecuteBatchDmlResponse;
4747
import com.google.spanner.v1.ExecuteSqlRequest;
4848
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
49+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
4950
import com.google.spanner.v1.RequestOptions;
5051
import com.google.spanner.v1.ResultSet;
5152
import com.google.spanner.v1.ResultSetStats;
@@ -179,6 +180,11 @@ public void removeListener(Runnable listener) {
179180
@GuardedBy("committingLock")
180181
private volatile boolean committing;
181182

183+
private final Object precommitTokenLock = new Object();
184+
185+
@GuardedBy("precommitTokenLock")
186+
private MultiplexedSessionPrecommitToken latestPrecommitToken;
187+
182188
@GuardedBy("lock")
183189
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();
184190

@@ -439,6 +445,10 @@ public void run() {
439445
}
440446
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
441447
}
448+
if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) {
449+
// Set the precommit token in the CommitRequest for multiplexed sessions.
450+
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
451+
}
442452
final CommitRequest commitRequest = requestBuilder.build();
443453
span.addAnnotation("Starting Commit");
444454
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
@@ -643,6 +653,25 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
643653
}
644654
}
645655

656+
/**
657+
* In read-write transactions, the precommit token with the highest sequence number from this
658+
* transaction attempt will be tracked and included in the
659+
* [Commit][google.spanner.v1.Spanner.Commit] request for the transaction.
660+
*/
661+
@Override
662+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {
663+
if (token == null) {
664+
return;
665+
}
666+
synchronized (precommitTokenLock) {
667+
if (this.latestPrecommitToken == null
668+
|| token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) {
669+
this.latestPrecommitToken = token;
670+
txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken);
671+
}
672+
}
673+
}
674+
646675
@Nullable
647676
String getTransactionTag() {
648677
if (this.options.hasTag()) {
@@ -651,6 +680,13 @@ String getTransactionTag() {
651680
return null;
652681
}
653682

683+
@Nullable
684+
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
685+
synchronized (precommitTokenLock) {
686+
return this.latestPrecommitToken;
687+
}
688+
}
689+
654690
@Override
655691
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
656692
e = super.onError(e, withBeginTransaction);
@@ -829,6 +865,9 @@ private ResultSet internalExecuteUpdate(
829865
throw new IllegalArgumentException(
830866
"DML response missing stats possibly due to non-DML statement as input");
831867
}
868+
if (resultSet.hasPrecommitToken()) {
869+
onPrecommitToken(resultSet.getPrecommitToken());
870+
}
832871
return resultSet;
833872
} catch (Throwable t) {
834873
throw onError(
@@ -903,6 +942,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
903942
resultSet.get().getMetadata().getTransaction(),
904943
builder.getTransaction().hasBegin());
905944
}
945+
if (resultSet.get().hasPrecommitToken()) {
946+
onPrecommitToken(resultSet.get().getPrecommitToken());
947+
}
906948
} catch (Throwable e) {
907949
// Ignore this error here as it is handled by the future that is returned by the
908950
// executeUpdateAsync method.
@@ -958,6 +1000,10 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
9581000
}
9591001
}
9601002

1003+
if (response.hasPrecommitToken()) {
1004+
onPrecommitToken(response.getPrecommitToken());
1005+
}
1006+
9611007
// If one of the DML statements was aborted, we should throw an aborted exception.
9621008
// In all other cases, we should throw a BatchUpdateException.
9631009
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
@@ -1022,6 +1068,9 @@ public ApiFuture<long[]> batchUpdateAsync(
10221068
builder.getTransaction().hasBegin());
10231069
}
10241070
}
1071+
if (batchDmlResponse.hasPrecommitToken()) {
1072+
onPrecommitToken(batchDmlResponse.getPrecommitToken());
1073+
}
10251074
// If one of the DML statements was aborted, we should throw an aborted exception.
10261075
// In all other cases, we should throw a BatchUpdateException.
10271076
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.common.collect.ImmutableMap;
3737
import com.google.protobuf.ByteString;
3838
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
39+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3940
import com.google.spanner.v1.PartialResultSet;
4041
import com.google.spanner.v1.QueryPlan;
4142
import com.google.spanner.v1.ResultSetMetadata;
@@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
7778

7879
@Override
7980
public void onDone(boolean withBeginTransaction) {}
81+
82+
@Override
83+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
8084
}
8185

8286
@Before

0 commit comments

Comments
 (0)