Skip to content

Commit 0d85609

Browse files
committed
chore(spanner): add precommit token changes
1 parent fae1585 commit 0d85609

File tree

12 files changed

+108
-17
lines changed

12 files changed

+108
-17
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.google.spanner.v1.ExecuteSqlRequest;
4949
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
5050
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
51+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
5152
import com.google.spanner.v1.PartialResultSet;
5253
import com.google.spanner.v1.ReadRequest;
5354
import com.google.spanner.v1.RequestOptions;
@@ -878,10 +879,19 @@ String getTransactionTag() {
878879
return null;
879880
}
880881

882+
@Nullable
883+
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
884+
return null;
885+
}
886+
881887
/** This method is called when a statement returned a new transaction as part of its results. */
882888
@Override
883889
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}
884890

891+
/** This method is called when a response returns a new pre-commit token as part of its results. */
892+
@Override
893+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token){}
894+
885895
@Override
886896
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
887897
this.session.onError(e);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.protobuf.ListValue;
2828
import com.google.protobuf.ProtocolMessageEnum;
2929
import com.google.protobuf.Value.KindCase;
30+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3031
import com.google.spanner.v1.Transaction;
3132
import java.io.IOException;
3233
import java.io.Serializable;
@@ -52,6 +53,8 @@ interface Listener {
5253
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
5354
throws SpannerException;
5455

56+
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
57+
5558
/** Called when the read finishes with an error. Returns the error that should be thrown. */
5659
SpannerException onError(SpannerException e, boolean withBeginTransaction);
5760

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
241241
public TransactionRunner readWriteTransaction(TransactionOption... options) {
242242
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
243243
try (IScope s = tracer.withSpan(span)) {
244-
return getSession().readWriteTransaction(options);
244+
return getMultiplexedSession().readWriteTransaction(options);
245245
} catch (RuntimeException e) {
246246
span.setStatus(e);
247247
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR
4747

4848
GrpcResultSet(
4949
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
50-
this.iterator = new GrpcValueIterator(iterator);
50+
this.iterator = new GrpcValueIterator(iterator, listener);
5151
this.listener = listener;
5252
this.decodeMode = decodeMode;
5353
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import static com.google.common.base.Preconditions.checkState;
2121

2222
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
23+
import com.google.cloud.spanner.AbstractResultSet.Listener;
2324
import com.google.common.collect.AbstractIterator;
2425
import com.google.protobuf.ListValue;
2526
import com.google.protobuf.Value.KindCase;
27+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
2628
import com.google.spanner.v1.PartialResultSet;
2729
import com.google.spanner.v1.ResultSetMetadata;
2830
import com.google.spanner.v1.ResultSetStats;
@@ -44,9 +46,12 @@ private enum StreamValue {
4446
private PartialResultSet current;
4547
private int pos;
4648
private ResultSetStats statistics;
49+
private MultiplexedSessionPrecommitToken precommitToken;
50+
private final Listener listener;
4751

48-
GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
52+
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
4953
this.stream = stream;
54+
this.listener = listener;
5055
}
5156

5257
@SuppressWarnings("unchecked")
@@ -154,6 +159,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
154159
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
155160
}
156161
}
162+
// Collect precommit token from all PartialResultSet
163+
if(current.hasPrecommitToken()) {
164+
listener.onPrecommitToken(current.getPrecommitToken());
165+
}
157166
if (current.hasStats()) {
158167
statistics = current.getStats();
159168
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,11 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
404404
return createMultiplexedSessionTransaction(false).readOnlyTransaction(bound);
405405
}
406406

407+
@Override
408+
public TransactionRunner readWriteTransaction(TransactionOption... options) {
409+
return createMultiplexedSessionTransaction(false).readWriteTransaction(options);
410+
}
411+
407412
/**
408413
* It is enough with one executor to maintain the multiplexed sessions in all the clients, as they
409414
* do not need to be updated often, and the maintenance task is light. The core pool size is set

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,15 +426,18 @@ public void close() {
426426
}
427427
}
428428

429-
ApiFuture<ByteString> beginTransactionAsync(
430-
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
431-
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
429+
ApiFuture<Transaction> beginTransactionAsync(
430+
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint, com.google.spanner.v1.Mutation mutation) {
431+
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
432432
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
433-
final BeginTransactionRequest request =
433+
BeginTransactionRequest request =
434434
BeginTransactionRequest.newBuilder()
435435
.setSession(getName())
436436
.setOptions(createReadWriteTransactionOptions(transactionOptions))
437437
.build();
438+
if (sessionReference.getIsMultiplexed()) {
439+
request = request.toBuilder().setMutationKey(mutation).build();
440+
}
438441
final ApiFuture<Transaction> requestFuture;
439442
try (IScope ignore = tracer.withSpan(span)) {
440443
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
@@ -448,7 +451,7 @@ ApiFuture<ByteString> beginTransactionAsync(
448451
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
449452
}
450453
span.end();
451-
res.set(txn.getId());
454+
res.set(txn);
452455
} catch (ExecutionException e) {
453456
span.setStatus(e);
454457
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.spanner.v1.ExecuteBatchDmlResponse;
4747
import com.google.spanner.v1.ExecuteSqlRequest;
4848
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
49+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
4950
import com.google.spanner.v1.RequestOptions;
5051
import com.google.spanner.v1.ResultSet;
5152
import com.google.spanner.v1.ResultSetStats;
@@ -171,6 +172,11 @@ public void removeListener(Runnable listener) {
171172
@GuardedBy("committingLock")
172173
private volatile boolean committing;
173174

175+
private final Object preCommitTokenLock = new Object();
176+
177+
@GuardedBy("preCommitTokenLock")
178+
private MultiplexedSessionPrecommitToken latestPreCommitToken;
179+
174180
@GuardedBy("lock")
175181
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();
176182

@@ -268,7 +274,7 @@ void ensureTxn() {
268274
ApiFuture<Void> ensureTxnAsync() {
269275
final SettableApiFuture<Void> res = SettableApiFuture.create();
270276
if (transactionId == null || isAborted()) {
271-
createTxnAsync(res);
277+
createTxnAsync(res, null);
272278
} else {
273279
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
274280
txnLogger.log(
@@ -280,19 +286,23 @@ ApiFuture<Void> ensureTxnAsync() {
280286
return res;
281287
}
282288

283-
private void createTxnAsync(final SettableApiFuture<Void> res) {
289+
private void createTxnAsync(final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
284290
span.addAnnotation("Creating Transaction");
285-
final ApiFuture<ByteString> fut =
286-
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
291+
final ApiFuture<Transaction> fut =
292+
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint(), mutation);
287293
fut.addListener(
288294
() -> {
289295
try {
290-
transactionId = fut.get();
296+
Transaction txn = fut.get();
297+
transactionId = txn.getId();
291298
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
292299
txnLogger.log(
293300
Level.FINER,
294301
"Started transaction {0}",
295302
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
303+
if (txn.hasPrecommitToken()) {
304+
onPrecommitToken(txn.getPrecommitToken());
305+
}
296306
res.set(null);
297307
} catch (ExecutionException e) {
298308
span.addAnnotation(
@@ -370,7 +380,7 @@ ApiFuture<CommitResponse> commitAsync() {
370380
synchronized (lock) {
371381
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
372382
finishOps = SettableApiFuture.create();
373-
createTxnAsync(finishOps);
383+
createTxnAsync(finishOps, mutationsProto.get(0));
374384
} else {
375385
finishOps = finishedAsyncOperations;
376386
}
@@ -423,6 +433,13 @@ public void run() {
423433
}
424434
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
425435
}
436+
if (session.getIsMultiplexed()) {
437+
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
438+
System.out.println("setting precommit token in request");
439+
System.out.println(requestBuilder.getPrecommitToken().getPrecommitToken());
440+
441+
txnLogger.log(Level.ALL, "setting precommit token to commit "+requestBuilder.getPrecommitToken().getPrecommitToken());
442+
}
426443
final CommitRequest commitRequest = requestBuilder.build();
427444
span.addAnnotation("Starting Commit");
428445
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
@@ -625,6 +642,17 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
625642
}
626643
}
627644

645+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token){
646+
if(token == null) return;
647+
synchronized (preCommitTokenLock) {
648+
if (this.latestPreCommitToken == null || token.getSeqNum() > this.latestPreCommitToken.getSeqNum()) {
649+
this.latestPreCommitToken = token;
650+
System.out.println("Updating precommit token to "+this.latestPreCommitToken);
651+
txnLogger.log(Level.ALL, "Updating precommit token to "+this.latestPreCommitToken);
652+
}
653+
}
654+
}
655+
628656
@Nullable
629657
String getTransactionTag() {
630658
if (this.options.hasTag()) {
@@ -633,6 +661,13 @@ String getTransactionTag() {
633661
return null;
634662
}
635663

664+
@Nullable
665+
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
666+
synchronized (preCommitTokenLock) {
667+
return this.latestPreCommitToken;
668+
}
669+
}
670+
636671
@Override
637672
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
638673
e = super.onError(e, withBeginTransaction);
@@ -811,6 +846,9 @@ private ResultSet internalExecuteUpdate(
811846
throw new IllegalArgumentException(
812847
"DML response missing stats possibly due to non-DML statement as input");
813848
}
849+
if (resultSet.hasPrecommitToken()) {
850+
onPrecommitToken(resultSet.getPrecommitToken());
851+
}
814852
return resultSet;
815853
} catch (Throwable t) {
816854
throw onError(
@@ -885,6 +923,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
885923
resultSet.get().getMetadata().getTransaction(),
886924
builder.getTransaction().hasBegin());
887925
}
926+
if (resultSet.get().hasPrecommitToken()) {
927+
onPrecommitToken(resultSet.get().getPrecommitToken());
928+
}
888929
} catch (Throwable e) {
889930
// Ignore this error here as it is handled by the future that is returned by the
890931
// executeUpdateAsync method.
@@ -938,6 +979,10 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
938979
response.getResultSets(i).getMetadata().getTransaction(),
939980
builder.getTransaction().hasBegin());
940981
}
982+
// TODO(harsha): check if we need to get precommit_token from response.getResultSets
983+
if (response.hasPrecommitToken()) {
984+
onPrecommitToken(response.getPrecommitToken());
985+
}
941986
}
942987

943988
// If one of the DML statements was aborted, we should throw an aborted exception.
@@ -1004,6 +1049,9 @@ public ApiFuture<long[]> batchUpdateAsync(
10041049
builder.getTransaction().hasBegin());
10051050
}
10061051
}
1052+
if (batchDmlResponse.hasPrecommitToken()) {
1053+
onPrecommitToken(batchDmlResponse.getPrecommitToken());
1054+
}
10071055
// If one of the DML statements was aborted, we should throw an aborted exception.
10081056
// In all other cases, we should throw a BatchUpdateException.
10091057
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.common.collect.ImmutableMap;
3737
import com.google.protobuf.ByteString;
3838
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
39+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3940
import com.google.spanner.v1.PartialResultSet;
4041
import com.google.spanner.v1.QueryPlan;
4142
import com.google.spanner.v1.ResultSetMetadata;
@@ -70,6 +71,9 @@ private static class NoOpListener implements AbstractResultSet.Listener {
7071
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
7172
throws SpannerException {}
7273

74+
@Override
75+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
76+
7377
@Override
7478
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
7579
return e;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2525
import com.google.common.io.Resources;
2626
import com.google.protobuf.util.JsonFormat;
27+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
2728
import com.google.spanner.v1.PartialResultSet;
2829
import com.google.spanner.v1.Transaction;
2930
import java.math.BigDecimal;
@@ -49,6 +50,9 @@ private static class NoOpListener implements AbstractResultSet.Listener {
4950
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
5051
throws SpannerException {}
5152

53+
@Override
54+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
55+
5256
@Override
5357
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
5458
return e;

0 commit comments

Comments
 (0)