Skip to content

Commit d5a7e0c

Browse files
committed
chore(spanner): handle commit retry protocol extension for mux read-write
1 parent 663a974 commit d5a7e0c

File tree

3 files changed

+127
-11
lines changed

3 files changed

+127
-11
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,9 @@ ApiFuture<CommitResponse> commitAsync() {
409409
}
410410
builder.addAllMutations(mutationsProto);
411411
finishOps.addListener(
412-
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
412+
new CommitRunnable(
413+
res, finishOps, builder, /* retryAttemptDueToCommitProtocolExtension = */ false),
414+
MoreExecutors.directExecutor());
413415
return res;
414416
}
415417

@@ -418,14 +420,17 @@ private final class CommitRunnable implements Runnable {
418420
private final SettableApiFuture<CommitResponse> res;
419421
private final ApiFuture<Void> prev;
420422
private final CommitRequest.Builder requestBuilder;
423+
private final boolean retryAttemptDueToCommitProtocolExtension;
421424

422425
CommitRunnable(
423426
SettableApiFuture<CommitResponse> res,
424427
ApiFuture<Void> prev,
425-
CommitRequest.Builder requestBuilder) {
428+
CommitRequest.Builder requestBuilder,
429+
boolean retryAttemptDueToCommitProtocolExtension) {
426430
this.res = res;
427431
this.prev = prev;
428432
this.requestBuilder = requestBuilder;
433+
this.retryAttemptDueToCommitProtocolExtension = retryAttemptDueToCommitProtocolExtension;
429434
}
430435

431436
@Override
@@ -459,6 +464,13 @@ public void run() {
459464
// Set the precommit token in the CommitRequest for multiplexed sessions.
460465
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
461466
}
467+
if (retryAttemptDueToCommitProtocolExtension) {
468+
// When a retry occurs due to the commit protocol extension, clear all mutations because
469+
// they were already buffered in SpanFE during the previous attempt.
470+
requestBuilder.clearMutations();
471+
span.addAnnotation(
472+
"Retrying commit operation with a new precommit token obtained from the previous CommitResponse");
473+
}
462474
final CommitRequest commitRequest = requestBuilder.build();
463475
span.addAnnotation("Starting Commit");
464476
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
@@ -479,6 +491,32 @@ public void run() {
479491
return;
480492
}
481493
com.google.spanner.v1.CommitResponse proto = commitFuture.get();
494+
495+
// If the CommitResponse includes a precommit token, the client will retry the
496+
// commit RPC once with the new token and clear any existing mutations.
497+
// This case is applicable only when the read-write transaction uses multiplexed
498+
// session.
499+
if (proto.hasPrecommitToken() && !retryAttemptDueToCommitProtocolExtension) {
500+
// track the latest pre commit token
501+
onPrecommitToken(proto.getPrecommitToken());
502+
span.addAnnotation(
503+
"Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field");
504+
opSpan.addAnnotation(
505+
"Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field");
506+
opSpan.end();
507+
508+
// Retry the commit RPC with the latest precommit token from CommitResponse.
509+
MoreExecutors.directExecutor()
510+
.execute(
511+
new CommitRunnable(
512+
res,
513+
prev,
514+
requestBuilder,
515+
/* retryAttemptDueToCommitProtocolExtension = */ true));
516+
517+
// Exit to prevent further processing in this attempt.
518+
return;
519+
}
482520
if (!proto.hasCommitTimestamp()) {
483521
throw newSpannerException(
484522
ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ private static void checkStreamException(
603603
private ConcurrentMap<ByteString, Boolean> isPartitionedDmlTransaction =
604604
new ConcurrentHashMap<>();
605605
private ConcurrentMap<ByteString, Boolean> abortedTransactions = new ConcurrentHashMap<>();
606+
private ConcurrentMap<ByteString, Boolean> commitRetryTransactions = new ConcurrentHashMap<>();
606607
private final AtomicBoolean abortNextTransaction = new AtomicBoolean();
607608
private final AtomicBoolean abortNextStatement = new AtomicBoolean();
608609
private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean();
@@ -2045,15 +2046,23 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20452046
return;
20462047
}
20472048
simulateAbort(session, request.getTransactionId());
2048-
commitTransaction(transaction.getId());
2049-
CommitResponse.Builder responseBuilder =
2050-
CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp());
2051-
if (request.getReturnCommitStats()) {
2052-
responseBuilder.setCommitStats(
2053-
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
2054-
// This is not really always equal, but at least it returns a value.
2055-
.setMutationCount(request.getMutationsCount())
2056-
.build());
2049+
CommitResponse.Builder responseBuilder = CommitResponse.newBuilder();
2050+
Optional<Boolean> commitRetry =
2051+
Optional.fromNullable(commitRetryTransactions.get(request.getTransactionId()));
2052+
if (commitRetry.or(Boolean.FALSE) && session.getMultiplexed()) {
2053+
responseBuilder.setPrecommitToken(
2054+
getCommitResponsePrecommitToken(request.getTransactionId()));
2055+
commitRetryTransactions.remove(request.getTransactionId());
2056+
} else {
2057+
commitTransaction(transaction.getId());
2058+
responseBuilder.setCommitTimestamp(getCurrentGoogleTimestamp());
2059+
if (request.getReturnCommitStats()) {
2060+
responseBuilder.setCommitStats(
2061+
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
2062+
// This is not really always equal, but at least it returns a value.
2063+
.setMutationCount(request.getMutationsCount())
2064+
.build());
2065+
}
20572066
}
20582067
responseObserver.onNext(responseBuilder.build());
20592068
responseObserver.onCompleted();
@@ -2134,6 +2143,14 @@ void markAbortedTransaction(ByteString transactionId) {
21342143
transactionSequenceNo.remove(transactionId);
21352144
}
21362145

2146+
public void markCommitRetryOnTransaction(ByteString transactionId) {
2147+
Transaction transaction = transactions.get(transactionId);
2148+
if (transaction == null || !isReadWriteTransaction(transactionId)) {
2149+
return;
2150+
}
2151+
commitRetryTransactions.putIfAbsent(transactionId, Boolean.TRUE);
2152+
}
2153+
21372154
@Override
21382155
public void partitionQuery(
21392156
PartitionQueryRequest request, StreamObserver<PartitionResponse> responseObserver) {
@@ -2527,6 +2544,11 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken
25272544
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId);
25282545
}
25292546

2547+
static MultiplexedSessionPrecommitToken getCommitResponsePrecommitToken(
2548+
ByteString transactionId) {
2549+
return getPrecommitToken("CommitResponsePrecommitToken", transactionId);
2550+
}
2551+
25302552
static MultiplexedSessionPrecommitToken getPrecommitToken(
25312553
String value, ByteString transactionId) {
25322554
transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0));

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,6 +1575,62 @@ public void testOtherUnimplementedError_ReadWriteTransactionStillUsesMultiplexed
15751575
assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
15761576
}
15771577

1578+
@Test
1579+
public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
1580+
// This test simulates the commit retry protocol extension which occurs when a read-write
1581+
// transaction contains read/query + mutation operations.
1582+
DatabaseClientImpl client =
1583+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1584+
1585+
client
1586+
.readWriteTransaction()
1587+
.run(
1588+
transaction -> {
1589+
try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
1590+
//noinspection StatementWithEmptyBody
1591+
while (resultSet.next()) {
1592+
// ignore
1593+
}
1594+
}
1595+
1596+
Mutation mutation =
1597+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
1598+
transaction.buffer(mutation);
1599+
1600+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
1601+
// Force the Commit RPC to return a CommitResponse with MultiplexedSessionRetry field
1602+
// set.
1603+
// This scenario is only possible when a read-write transaction contains read/query +
1604+
// mutation operations.
1605+
mockSpanner.markCommitRetryOnTransaction(impl.transactionId);
1606+
return null;
1607+
});
1608+
1609+
List<ExecuteSqlRequest> executeSqlRequests =
1610+
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
1611+
assertEquals(1, executeSqlRequests.size());
1612+
// Verify the request is executed using multiplexed sessions
1613+
assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed());
1614+
1615+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
1616+
assertEquals(2, commitRequests.size());
1617+
assertNotNull(commitRequests.get(0).getPrecommitToken());
1618+
assertEquals(
1619+
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
1620+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
1621+
1622+
// Second CommitRequest should contain the latest precommit token received via the
1623+
// CommitResponse in previous attempt.
1624+
assertNotNull(commitRequests.get(1).getPrecommitToken());
1625+
assertEquals(
1626+
ByteString.copyFromUtf8("CommitResponsePrecommitToken"),
1627+
commitRequests.get(1).getPrecommitToken().getPrecommitToken());
1628+
1629+
assertNotNull(client.multiplexedSessionDatabaseClient);
1630+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
1631+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
1632+
}
1633+
15781634
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
15791635
assertNotNull(client.multiplexedSessionDatabaseClient);
15801636
SessionReference sessionReference =

0 commit comments

Comments
 (0)