Skip to content

Commit a2aa260

Browse files
committed
chore(spanner): tracking precommit token from response
1 parent 139a715 commit a2aa260

File tree

5 files changed

+69
-2
lines changed

5 files changed

+69
-2
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.google.spanner.v1.ExecuteSqlRequest;
4949
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
5050
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
51+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
5152
import com.google.spanner.v1.PartialResultSet;
5253
import com.google.spanner.v1.ReadRequest;
5354
import com.google.spanner.v1.RequestOptions;
@@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
893894
this.session.onReadDone();
894895
}
895896

897+
/**
898+
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
899+
* present in the RPC response. In such cases, this method will be a no-op.
900+
*/
901+
@Override
902+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
903+
896904
private ResultSet readInternal(
897905
String table,
898906
@Nullable String index,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.protobuf.ListValue;
2828
import com.google.protobuf.ProtocolMessageEnum;
2929
import com.google.protobuf.Value.KindCase;
30+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3031
import com.google.spanner.v1.Transaction;
3132
import java.io.IOException;
3233
import java.io.Serializable;
@@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
5758

5859
/** Called when the read finishes normally. */
5960
void onDone(boolean withBeginTransaction);
61+
62+
/**
63+
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
64+
* will be included if the read-write transaction is executed on a multiplexed session.
65+
*/
66+
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
6067
}
6168

6269
static final class LazyByteArray implements Serializable {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR
4747

4848
GrpcResultSet(
4949
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
50-
this.iterator = new GrpcValueIterator(iterator);
50+
this.iterator = new GrpcValueIterator(iterator, listener);
5151
this.listener = listener;
5252
this.decodeMode = decodeMode;
5353
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.common.base.Preconditions.checkState;
2121

2222
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
23+
import com.google.cloud.spanner.AbstractResultSet.Listener;
2324
import com.google.common.collect.AbstractIterator;
2425
import com.google.protobuf.ListValue;
2526
import com.google.protobuf.Value.KindCase;
@@ -44,9 +45,11 @@ private enum StreamValue {
4445
private PartialResultSet current;
4546
private int pos;
4647
private ResultSetStats statistics;
48+
private final Listener listener;
4749

48-
GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
50+
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
4951
this.stream = stream;
52+
this.listener = listener;
5053
}
5154

5255
@SuppressWarnings("unchecked")
@@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
154157
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
155158
}
156159
}
160+
// collect the precommit token from each PartialResultSet
161+
if (current.hasPrecommitToken()) {
162+
listener.onPrecommitToken(current.getPrecommitToken());
163+
}
157164
if (current.hasStats()) {
158165
statistics = current.getStats();
159166
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.spanner.v1.ExecuteBatchDmlResponse;
4747
import com.google.spanner.v1.ExecuteSqlRequest;
4848
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
49+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
4950
import com.google.spanner.v1.RequestOptions;
5051
import com.google.spanner.v1.ResultSet;
5152
import com.google.spanner.v1.ResultSetStats;
@@ -171,6 +172,11 @@ public void removeListener(Runnable listener) {
171172
@GuardedBy("committingLock")
172173
private volatile boolean committing;
173174

175+
private final Object precommitTokenLock = new Object();
176+
177+
@GuardedBy("precommitTokenLock")
178+
private MultiplexedSessionPrecommitToken latestPrecommitToken;
179+
174180
@GuardedBy("lock")
175181
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();
176182

@@ -625,6 +631,24 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
625631
}
626632
}
627633

634+
/**
635+
* In read-write transactions, the precommit token with the highest sequence number from this
636+
* transaction attempt will be tracked and included in the
637+
* [Commit][google.spanner.v1.Spanner.Commit] request for the transaction.
638+
*/
639+
@Override
640+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {
641+
if (token == null) return;
642+
synchronized (precommitTokenLock) {
643+
if (this.latestPrecommitToken == null
644+
|| token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) {
645+
this.latestPrecommitToken = token;
646+
System.out.println("Updating precommit token to " + this.latestPrecommitToken);
647+
txnLogger.log(Level.ALL, "Updating precommit token to " + this.latestPrecommitToken);
648+
}
649+
}
650+
}
651+
628652
@Nullable
629653
String getTransactionTag() {
630654
if (this.options.hasTag()) {
@@ -633,6 +657,13 @@ String getTransactionTag() {
633657
return null;
634658
}
635659

660+
@Nullable
661+
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
662+
synchronized (precommitTokenLock) {
663+
return this.latestPrecommitToken;
664+
}
665+
}
666+
636667
@Override
637668
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
638669
e = super.onError(e, withBeginTransaction);
@@ -811,6 +842,9 @@ private ResultSet internalExecuteUpdate(
811842
throw new IllegalArgumentException(
812843
"DML response missing stats possibly due to non-DML statement as input");
813844
}
845+
if (resultSet.hasPrecommitToken()) {
846+
onPrecommitToken(resultSet.getPrecommitToken());
847+
}
814848
return resultSet;
815849
} catch (Throwable t) {
816850
throw onError(
@@ -885,6 +919,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
885919
resultSet.get().getMetadata().getTransaction(),
886920
builder.getTransaction().hasBegin());
887921
}
922+
if (resultSet.get().hasPrecommitToken()) {
923+
onPrecommitToken(resultSet.get().getPrecommitToken());
924+
}
888925
} catch (Throwable e) {
889926
// Ignore this error here as it is handled by the future that is returned by the
890927
// executeUpdateAsync method.
@@ -940,6 +977,11 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
940977
}
941978
}
942979

980+
// TODO(sriharshach): check if we need to get precommit_token from response.getResultSets
981+
if (response.hasPrecommitToken()) {
982+
onPrecommitToken(response.getPrecommitToken());
983+
}
984+
943985
// If one of the DML statements was aborted, we should throw an aborted exception.
944986
// In all other cases, we should throw a BatchUpdateException.
945987
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
@@ -1004,6 +1046,9 @@ public ApiFuture<long[]> batchUpdateAsync(
10041046
builder.getTransaction().hasBegin());
10051047
}
10061048
}
1049+
if (batchDmlResponse.hasPrecommitToken()) {
1050+
onPrecommitToken(batchDmlResponse.getPrecommitToken());
1051+
}
10071052
// If one of the DML statements was aborted, we should throw an aborted exception.
10081053
// In all other cases, we should throw a BatchUpdateException.
10091054
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {

0 commit comments

Comments
 (0)