Skip to content

Commit 6d1b6dc

Browse files
committed
chore(spanner): support precommit token for mutation only in mock spanner and mock spanner tests
1 parent e942e35 commit 6d1b6dc

File tree

2 files changed

+226
-6
lines changed

2 files changed

+226
-6
lines changed

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,7 +1828,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
18281828
transactionId = null;
18291829
break;
18301830
case BEGIN:
1831-
transactionId = beginTransaction(session, tx.getBegin()).getId();
1831+
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
18321832
break;
18331833
case ID:
18341834
Transaction transaction = transactions.get(tx.getId());
@@ -1883,7 +1883,8 @@ public void beginTransaction(
18831883
try {
18841884
beginTransactionExecutionTime.simulateExecutionTime(
18851885
exceptions, stickyGlobalExceptions, freezeLock);
1886-
Transaction transaction = beginTransaction(session, request.getOptions());
1886+
Transaction transaction =
1887+
beginTransaction(session, request.getOptions(), request.getMutationKey());
18871888
responseObserver.onNext(transaction);
18881889
responseObserver.onCompleted();
18891890
} catch (StatusRuntimeException t) {
@@ -1893,12 +1894,19 @@ public void beginTransaction(
18931894
}
18941895
}
18951896

1896-
private Transaction beginTransaction(Session session, TransactionOptions options) {
1897-
Transaction.Builder builder =
1898-
Transaction.newBuilder().setId(generateTransactionName(session.getName()));
1897+
private Transaction beginTransaction(
1898+
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
1899+
ByteString transactionId = generateTransactionName(session.getName());
1900+
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
18991901
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
19001902
setReadTimestamp(options, builder);
19011903
}
1904+
if (session.getMultiplexed()
1905+
&& options.getModeCase() == ModeCase.READ_WRITE
1906+
&& mutationKey != null) {
1907+
// Mutation only case in a read-write transaction.
1908+
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
1909+
}
19021910
Transaction transaction = builder.build();
19031911
transactions.put(transaction.getId(), transaction);
19041912
transactionsStarted.add(transaction.getId());
@@ -2005,7 +2013,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20052013
session,
20062014
TransactionOptions.newBuilder()
20072015
.setReadWrite(ReadWrite.getDefaultInstance())
2008-
.build());
2016+
.build(),
2017+
null);
20092018
} else if (request.getTransactionId() != null) {
20102019
transaction = transactions.get(request.getTransactionId());
20112020
Optional<Boolean> aborted =
@@ -2490,6 +2499,10 @@ Session getSession(String name) {
24902499
return null;
24912500
}
24922501

2502+
static MultiplexedSessionPrecommitToken getTransactionPrecommitToken(ByteString transactionId) {
2503+
return getPrecommitToken("TransactionPrecommitToken", transactionId);
2504+
}
2505+
24932506
static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) {
24942507
return getPrecommitToken("ResultSetPrecommitToken", transactionId);
24952508
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.spanner.MockSpannerTestUtil.INVALID_UPDATE_STATEMENT;
2020
import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_COUNT;
2121
import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_STATEMENT;
22+
import static com.google.cloud.spanner.SpannerApiFutures.get;
2223
import static com.google.common.truth.Truth.assertThat;
2324
import static org.junit.Assert.assertEquals;
2425
import static org.junit.Assert.assertFalse;
@@ -594,10 +595,23 @@ public void testMutationUsingWrite() {
594595
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()));
595596
assertNotNull(timestamp);
596597

598+
List<BeginTransactionRequest> beginTransactionRequests =
599+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
600+
assertEquals(2, beginTransactionRequests.size());
601+
for (BeginTransactionRequest request : beginTransactionRequests) {
602+
// Verify that mutation key is set for mutations-only case in read-write transaction.
603+
assertTrue(request.hasMutationKey());
604+
}
605+
597606
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
598607
assertEquals(2, commitRequests.size());
599608
for (CommitRequest request : commitRequests) {
600609
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
610+
// Verify that the precommit token is set in CommitRequest
611+
assertTrue(request.hasPrecommitToken());
612+
assertEquals(
613+
ByteString.copyFromUtf8("TransactionPrecommitToken"),
614+
request.getPrecommitToken().getPrecommitToken());
601615
}
602616

603617
assertNotNull(client.multiplexedSessionDatabaseClient);
@@ -1083,6 +1097,199 @@ public void testTxnTracksPrecommitTokenWithLatestSeqNo() {
10831097
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
10841098
}
10851099

1100+
@Test
1101+
public void testPrecommitTokenForTransactionResponse() {
1102+
// This test verifies that
1103+
// 1. A random mutation from the list is set in BeginTransactionRequest.
1104+
// 2. The precommit token from the Transaction response is correctly tracked
1105+
// and applied in the CommitRequest. The Transaction response includes a precommit token
1106+
// only when the read-write transaction consists solely of mutations.
1107+
1108+
DatabaseClientImpl client =
1109+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1110+
1111+
client
1112+
.readWriteTransaction()
1113+
.run(
1114+
transaction -> {
1115+
Mutation mutation =
1116+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
1117+
transaction.buffer(mutation);
1118+
return null;
1119+
});
1120+
1121+
// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
1122+
List<BeginTransactionRequest> beginTxnRequest =
1123+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1124+
assertEquals(1L, beginTxnRequest.size());
1125+
assertTrue(mockSpanner.getSession(beginTxnRequest.get(0).getSession()).getMultiplexed());
1126+
assertTrue(beginTxnRequest.get(0).hasMutationKey());
1127+
assertTrue(beginTxnRequest.get(0).getMutationKey().hasInsert());
1128+
1129+
// Verify that the latest precommit token is set in the CommitRequest
1130+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
1131+
assertEquals(1L, commitRequests.size());
1132+
assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
1133+
assertNotNull(commitRequests.get(0).getPrecommitToken());
1134+
assertEquals(
1135+
ByteString.copyFromUtf8("TransactionPrecommitToken"),
1136+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
1137+
}
1138+
1139+
@Test
1140+
public void testMutationOnlyCaseAborted() {
1141+
// This test verifies that in the case of mutations-only, when a transaction is retried after an
1142+
// ABORT, the mutation key is correctly set in the BeginTransaction request.
1143+
DatabaseClientImpl client =
1144+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1145+
1146+
// Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
1147+
// after the first call, so the retry should succeed.
1148+
mockSpanner.setCommitExecutionTime(
1149+
SimulatedExecutionTime.ofException(
1150+
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
1151+
client
1152+
.readWriteTransaction()
1153+
.run(
1154+
transaction -> {
1155+
Mutation mutation =
1156+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
1157+
transaction.buffer(mutation);
1158+
return null;
1159+
});
1160+
1161+
// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
1162+
List<BeginTransactionRequest> beginTransactionRequests =
1163+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1164+
assertEquals(2L, beginTransactionRequests.size());
1165+
// Verify the requests are executed using multiplexed sessions
1166+
for (BeginTransactionRequest request : beginTransactionRequests) {
1167+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
1168+
assertTrue(request.hasMutationKey());
1169+
assertTrue(request.getMutationKey().hasInsert());
1170+
}
1171+
1172+
// Verify that the latest precommit token is set in the CommitRequest
1173+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
1174+
assertEquals(2L, commitRequests.size());
1175+
for (CommitRequest request : commitRequests) {
1176+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
1177+
assertNotNull(request.getPrecommitToken());
1178+
assertEquals(
1179+
ByteString.copyFromUtf8("TransactionPrecommitToken"),
1180+
request.getPrecommitToken().getPrecommitToken());
1181+
}
1182+
}
1183+
1184+
@Test
1185+
public void testMutationOnlyUsingTransactionManager() {
1186+
// Test verifies mutation-only case within a R/W transaction via TransactionManager.
1187+
DatabaseClientImpl client =
1188+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1189+
1190+
try (TransactionManager manager = client.transactionManager()) {
1191+
TransactionContext transaction = manager.begin();
1192+
while (true) {
1193+
try {
1194+
Mutation mutation =
1195+
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
1196+
transaction.buffer(mutation);
1197+
manager.commit();
1198+
assertNotNull(manager.getCommitTimestamp());
1199+
break;
1200+
} catch (AbortedException e) {
1201+
transaction = manager.resetForRetry();
1202+
}
1203+
}
1204+
}
1205+
1206+
// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
1207+
List<BeginTransactionRequest> beginTransactionRequests =
1208+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1209+
assertThat(beginTransactionRequests).hasSize(1);
1210+
BeginTransactionRequest beginTransaction = beginTransactionRequests.get(0);
1211+
assertTrue(mockSpanner.getSession(beginTransaction.getSession()).getMultiplexed());
1212+
assertTrue(beginTransaction.hasMutationKey());
1213+
assertTrue(beginTransaction.getMutationKey().hasInsert());
1214+
1215+
// Verify that the latest precommit token is set in the CommitRequest
1216+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
1217+
assertThat(commitRequests).hasSize(1);
1218+
CommitRequest commitRequest = commitRequests.get(0);
1219+
assertNotNull(commitRequest.getPrecommitToken());
1220+
assertEquals(
1221+
ByteString.copyFromUtf8("TransactionPrecommitToken"),
1222+
commitRequest.getPrecommitToken().getPrecommitToken());
1223+
}
1224+
1225+
@Test
1226+
public void testMutationOnlyUsingAsyncRunner() {
1227+
// Test verifies mutation-only case within a R/W transaction via AsyncRunner.
1228+
DatabaseClientImpl client =
1229+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1230+
AsyncRunner runner = client.runAsync();
1231+
get(
1232+
runner.runAsync(
1233+
txn -> {
1234+
txn.buffer(Mutation.delete("TEST", KeySet.all()));
1235+
return ApiFutures.immediateFuture(null);
1236+
},
1237+
MoreExecutors.directExecutor()));
1238+
1239+
// Verify that the mutation key is set in BeginTransactionRequest
1240+
List<BeginTransactionRequest> beginTransactions =
1241+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1242+
assertThat(beginTransactions).hasSize(1);
1243+
BeginTransactionRequest beginTransaction = beginTransactions.get(0);
1244+
assertTrue(beginTransaction.hasMutationKey());
1245+
assertTrue(beginTransaction.getMutationKey().hasDelete());
1246+
1247+
// Verify that the latest precommit token is set in the CommitRequest
1248+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
1249+
assertThat(commitRequests).hasSize(1);
1250+
CommitRequest commitRequest = commitRequests.get(0);
1251+
assertNotNull(commitRequest.getPrecommitToken());
1252+
assertEquals(
1253+
ByteString.copyFromUtf8("TransactionPrecommitToken"),
1254+
commitRequest.getPrecommitToken().getPrecommitToken());
1255+
}
1256+
1257+
@Test
1258+
public void testMutationOnlyUsingAsyncTransactionManager() {
1259+
// Test verifies mutation-only case within a R/W transaction via AsyncTransactionManager.
1260+
DatabaseClientImpl client =
1261+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1262+
try (AsyncTransactionManager manager = client.transactionManagerAsync()) {
1263+
TransactionContextFuture transaction = manager.beginAsync();
1264+
get(
1265+
transaction
1266+
.then(
1267+
(txn, input) -> {
1268+
txn.buffer(Mutation.delete("TEST", KeySet.all()));
1269+
return ApiFutures.immediateFuture(null);
1270+
},
1271+
MoreExecutors.directExecutor())
1272+
.commitAsync());
1273+
}
1274+
1275+
// Verify that the mutation key is set in BeginTransactionRequest
1276+
List<BeginTransactionRequest> beginTransactions =
1277+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1278+
assertThat(beginTransactions).hasSize(1);
1279+
BeginTransactionRequest beginTransaction = beginTransactions.get(0);
1280+
assertTrue(beginTransaction.hasMutationKey());
1281+
assertTrue(beginTransaction.getMutationKey().hasDelete());
1282+
1283+
// Verify that the latest precommit token is set in the CommitRequest
1284+
List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
1285+
assertThat(requests).hasSize(1);
1286+
CommitRequest request = requests.get(0);
1287+
assertNotNull(request.getPrecommitToken());
1288+
assertEquals(
1289+
ByteString.copyFromUtf8("TransactionPrecommitToken"),
1290+
request.getPrecommitToken().getPrecommitToken());
1291+
}
1292+
10861293
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
10871294
assertNotNull(client.multiplexedSessionDatabaseClient);
10881295
SessionReference sessionReference =

0 commit comments

Comments
 (0)