Skip to content

Commit 9981932

Browse files
committed
chore(spanner): add precommit token support in mock spanner impl and add mock spanner tests
1 parent d14e5c8 commit 9981932

File tree

5 files changed

+229
-11
lines changed

5 files changed

+229
-11
lines changed

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.common.collect.ImmutableMap;
3737
import com.google.protobuf.ByteString;
3838
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
39+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3940
import com.google.spanner.v1.PartialResultSet;
4041
import com.google.spanner.v1.QueryPlan;
4142
import com.google.spanner.v1.ResultSetMetadata;
@@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
7778

7879
@Override
7980
public void onDone(boolean withBeginTransaction) {}
81+
82+
@Override
83+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
8084
}
8185

8286
@Before

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import com.google.spanner.v1.GetSessionRequest;
5555
import com.google.spanner.v1.ListSessionsRequest;
5656
import com.google.spanner.v1.ListSessionsResponse;
57+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
5758
import com.google.spanner.v1.PartialResultSet;
5859
import com.google.spanner.v1.Partition;
5960
import com.google.spanner.v1.PartitionOptions;
@@ -197,10 +198,12 @@ private static class PartialResultSetsIterator implements Iterator<PartialResult
197198
private boolean hasNext;
198199
private boolean first = true;
199200
private int currentRow = 0;
201+
private boolean isMultiplexedSession = false;
200202

201-
private PartialResultSetsIterator(ResultSet resultSet) {
203+
private PartialResultSetsIterator(ResultSet resultSet, boolean isMultiplexedSession) {
202204
this.resultSet = resultSet;
203205
this.hasNext = true;
206+
this.isMultiplexedSession = isMultiplexedSession;
204207
}
205208

206209
@Override
@@ -227,6 +230,9 @@ public PartialResultSet next() {
227230
}
228231
builder.setResumeToken(ByteString.copyFromUtf8(String.format("%09d", currentRow)));
229232
hasNext = currentRow < resultSet.getRowsCount();
233+
if (this.isMultiplexedSession) {
234+
builder.setPrecommitToken(getPartialResultSetPrecommitToken());
235+
}
230236
return builder.build();
231237
}
232238

@@ -1020,7 +1026,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
10201026
throw result.getException();
10211027
case RESULT_SET:
10221028
returnResultSet(
1023-
result.getResultSet(), transactionId, request.getTransaction(), responseObserver);
1029+
result.getResultSet(),
1030+
transactionId,
1031+
request.getTransaction(),
1032+
responseObserver,
1033+
session);
10241034
break;
10251035
case UPDATE_COUNT:
10261036
if (isPartitionedDmlTransaction(transactionId)) {
@@ -1033,7 +1043,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
10331043
.build())
10341044
.build());
10351045
} else {
1036-
responseObserver.onNext(
1046+
ResultSet.Builder resultSetBuilder =
10371047
ResultSet.newBuilder()
10381048
.setStats(
10391049
ResultSetStats.newBuilder()
@@ -1045,8 +1055,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
10451055
ignoreNextInlineBeginRequest.getAndSet(false)
10461056
? Transaction.getDefaultInstance()
10471057
: Transaction.newBuilder().setId(transactionId).build())
1048-
.build())
1049-
.build());
1058+
.build());
1059+
if (session.getMultiplexed()) {
1060+
resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken());
1061+
}
1062+
responseObserver.onNext(resultSetBuilder.build());
10501063
}
10511064
break;
10521065
default:
@@ -1064,7 +1077,8 @@ private void returnResultSet(
10641077
ResultSet resultSet,
10651078
ByteString transactionId,
10661079
TransactionSelector transactionSelector,
1067-
StreamObserver<ResultSet> responseObserver) {
1080+
StreamObserver<ResultSet> responseObserver,
1081+
Session session) {
10681082
ResultSetMetadata metadata = resultSet.getMetadata();
10691083
if (transactionId != null) {
10701084
metadata =
@@ -1080,6 +1094,9 @@ private void returnResultSet(
10801094
metadata = metadata.toBuilder().setTransaction(transaction).build();
10811095
}
10821096
resultSet = resultSet.toBuilder().setMetadata(metadata).build();
1097+
if (session.getMultiplexed()) {
1098+
resultSet = resultSet.toBuilder().setPrecommitToken(getResultSetPrecommitToken()).build();
1099+
}
10831100
responseObserver.onNext(resultSet);
10841101
}
10851102

@@ -1174,6 +1191,9 @@ public void executeBatchDml(
11741191
.build());
11751192
}
11761193
builder.setStatus(status);
1194+
if (session.getMultiplexed()) {
1195+
builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken());
1196+
}
11771197
responseObserver.onNext(builder.build());
11781198
responseObserver.onCompleted();
11791199
} catch (StatusRuntimeException e) {
@@ -1242,7 +1262,8 @@ public void executeStreamingSql(
12421262
transactionId,
12431263
request.getTransaction(),
12441264
responseObserver,
1245-
getExecuteStreamingSqlExecutionTime());
1265+
getExecuteStreamingSqlExecutionTime(),
1266+
session.getMultiplexed());
12461267
break;
12471268
case UPDATE_COUNT:
12481269
if (isPartitioned) {
@@ -1612,7 +1633,7 @@ public void read(final ReadRequest request, StreamObserver<ResultSet> responseOb
16121633
cols);
16131634
StatementResult res = getResult(statement);
16141635
returnResultSet(
1615-
res.getResultSet(), transactionId, request.getTransaction(), responseObserver);
1636+
res.getResultSet(), transactionId, request.getTransaction(), responseObserver, session);
16161637
responseObserver.onCompleted();
16171638
} catch (StatusRuntimeException e) {
16181639
responseObserver.onError(e);
@@ -1670,7 +1691,8 @@ public void streamingRead(
16701691
transactionId,
16711692
request.getTransaction(),
16721693
responseObserver,
1673-
getStreamingReadExecutionTime());
1694+
getStreamingReadExecutionTime(),
1695+
session.getMultiplexed());
16741696
} catch (StatusRuntimeException e) {
16751697
responseObserver.onError(e);
16761698
} catch (Throwable t) {
@@ -1683,7 +1705,8 @@ private void returnPartialResultSet(
16831705
ByteString transactionId,
16841706
TransactionSelector transactionSelector,
16851707
StreamObserver<PartialResultSet> responseObserver,
1686-
SimulatedExecutionTime executionTime)
1708+
SimulatedExecutionTime executionTime,
1709+
boolean isMultiplexedSession)
16871710
throws Exception {
16881711
ResultSetMetadata metadata = resultSet.getMetadata();
16891712
if (transactionId == null) {
@@ -1700,7 +1723,8 @@ private void returnPartialResultSet(
17001723
.build();
17011724
}
17021725
resultSet = resultSet.toBuilder().setMetadata(metadata).build();
1703-
PartialResultSetsIterator iterator = new PartialResultSetsIterator(resultSet);
1726+
PartialResultSetsIterator iterator =
1727+
new PartialResultSetsIterator(resultSet, isMultiplexedSession);
17041728
long index = 0L;
17051729
while (iterator.hasNext()) {
17061730
SimulatedExecutionTime.checkStreamException(
@@ -2447,4 +2471,23 @@ Session getSession(String name) {
24472471
}
24482472
return null;
24492473
}
2474+
2475+
static MultiplexedSessionPrecommitToken getResultSetPrecommitToken() {
2476+
return getPrecommitToken("ResultSetPrecommitToken", 1);
2477+
}
2478+
2479+
static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken() {
2480+
return getPrecommitToken("PartialResultSetPrecommitToken", 3);
2481+
}
2482+
2483+
static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken() {
2484+
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", 2);
2485+
}
2486+
2487+
static MultiplexedSessionPrecommitToken getPrecommitToken(String value, int seqNo) {
2488+
return MultiplexedSessionPrecommitToken.newBuilder()
2489+
.setPrecommitToken(ByteString.copyFromUtf8(value))
2490+
.setSeqNum(seqNo)
2491+
.build();
2492+
}
24502493
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
3737
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
3838
import com.google.cloud.spanner.Options.RpcPriority;
39+
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
3940
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
4041
import com.google.common.base.Stopwatch;
4142
import com.google.common.collect.ImmutableList;
43+
import com.google.common.collect.Lists;
4244
import com.google.common.util.concurrent.MoreExecutors;
4345
import com.google.protobuf.ByteString;
4446
import com.google.spanner.v1.CommitRequest;
@@ -745,6 +747,167 @@ public void testAsyncRunnerIsNonBlockingWithMultiplexedSession() throws Exceptio
745747
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
746748
}
747749

750+
@Test
751+
public void testPrecommitTokenForResultSet() {
752+
// This test verifies that the precommit token received from the ResultSet is properly tracked
753+
// and set in the CommitRequest.
754+
DatabaseClientImpl client =
755+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
756+
757+
Long count =
758+
client
759+
.readWriteTransaction()
760+
.run(
761+
transaction -> {
762+
long res = transaction.executeUpdate(UPDATE_STATEMENT);
763+
764+
// Verify that the latest precommit token is tracked in the transaction context.
765+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
766+
assertNotNull(impl.getLatestPrecommitToken());
767+
assertEquals(
768+
ByteString.copyFromUtf8("ResultSetPrecommitToken"),
769+
impl.getLatestPrecommitToken().getPrecommitToken());
770+
return res;
771+
});
772+
773+
assertNotNull(count);
774+
assertEquals(1, count.longValue());
775+
776+
// Verify that the latest precommit token is set in the CommitRequest
777+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
778+
assertEquals(1, commitRequests.size());
779+
assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
780+
assertNotNull(commitRequests.get(0).getPrecommitToken());
781+
assertEquals(
782+
ByteString.copyFromUtf8("ResultSetPrecommitToken"),
783+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
784+
}
785+
786+
@Test
787+
public void testPrecommitTokenForExecuteBatchDmlResponse() {
788+
// This test verifies that the precommit token received from the ExecuteBatchDmlResponse is
789+
// properly tracked and set in the CommitRequest.
790+
DatabaseClientImpl client =
791+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
792+
793+
long[] count =
794+
client
795+
.readWriteTransaction()
796+
.run(
797+
transaction -> {
798+
long[] res = transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT));
799+
800+
// Verify that the latest precommit token is tracked in the transaction context.
801+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
802+
assertNotNull(impl.getLatestPrecommitToken());
803+
assertEquals(
804+
ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"),
805+
impl.getLatestPrecommitToken().getPrecommitToken());
806+
return res;
807+
});
808+
809+
assertNotNull(count);
810+
assertEquals(1, count.length);
811+
812+
// Verify that the latest precommit token is set in the CommitRequest
813+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
814+
assertEquals(1, commitRequests.size());
815+
assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
816+
assertNotNull(commitRequests.get(0).getPrecommitToken());
817+
assertEquals(
818+
ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"),
819+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
820+
}
821+
822+
@Test
823+
public void testPrecommitTokenForPartialResultSet() {
824+
// This test verifies that the precommit token received from the PartialResultSet is properly
825+
// tracked and set in the CommitRequest.
826+
DatabaseClientImpl client =
827+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
828+
829+
client
830+
.readWriteTransaction()
831+
.run(
832+
transaction -> {
833+
ResultSet resultSet = transaction.executeQuery(STATEMENT);
834+
//noinspection StatementWithEmptyBody
835+
while (resultSet.next()) {
836+
// ignore
837+
}
838+
839+
// Verify that the latest precommit token is tracked in the transaction context.
840+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
841+
assertNotNull(impl.getLatestPrecommitToken());
842+
assertEquals(
843+
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
844+
impl.getLatestPrecommitToken().getPrecommitToken());
845+
return null;
846+
});
847+
848+
// Verify that the latest precommit token is set in the CommitRequest
849+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
850+
assertEquals(1, commitRequests.size());
851+
assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
852+
assertNotNull(commitRequests.get(0).getPrecommitToken());
853+
assertEquals(
854+
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
855+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
856+
}
857+
858+
@Test
859+
public void testTxnTracksPrecommitTokenWithLatestSeqNo() {
860+
// This test ensures that the read-write transaction tracks the precommit token with the
861+
// highest sequence number (in this case, PartialResultSetPrecommitToken) and sets it in the
862+
// CommitRequest.
863+
// ResultSetPrecommitToken -> Seq no 1
864+
// ExecuteBatchDmlResponsePrecommitToken -> Seq no 2
865+
// PartialResultSetPrecommitToken -> Seq no 3
866+
867+
DatabaseClientImpl client =
868+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
869+
870+
client
871+
.readWriteTransaction()
872+
.run(
873+
transaction -> {
874+
// Returns a ResultSet containing the precommit token (ResultSetPrecommitToken) with
875+
// Seq no 1
876+
transaction.executeUpdate(UPDATE_STATEMENT);
877+
878+
// Returns a PartialResultSet containing the precommit token
879+
// (PartialResultSetPrecommitToken) with Seq no 3
880+
ResultSet resultSet = transaction.executeQuery(STATEMENT);
881+
//noinspection StatementWithEmptyBody
882+
while (resultSet.next()) {
883+
// ignore
884+
}
885+
886+
// Returns a ExecuteBatchDmlResponse containing the precommit token
887+
// (ExecuteBatchDmlResponsePrecommitToken) with Seq no 2
888+
transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT));
889+
890+
// Verify that the latest precommit token with highest sequence number is tracked in
891+
// the
892+
// transaction context.
893+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
894+
assertNotNull(impl.getLatestPrecommitToken());
895+
assertEquals(
896+
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
897+
impl.getLatestPrecommitToken().getPrecommitToken());
898+
return null;
899+
});
900+
901+
// Verify that the latest precommit token is set in the CommitRequest
902+
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
903+
assertEquals(1, commitRequests.size());
904+
assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed());
905+
assertNotNull(commitRequests.get(0).getPrecommitToken());
906+
assertEquals(
907+
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
908+
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
909+
}
910+
748911
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
749912
assertNotNull(client.multiplexedSessionDatabaseClient);
750913
SessionReference sessionReference =

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2525
import com.google.common.io.Resources;
2626
import com.google.protobuf.util.JsonFormat;
27+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
2728
import com.google.spanner.v1.PartialResultSet;
2829
import com.google.spanner.v1.Transaction;
2930
import java.math.BigDecimal;
@@ -56,6 +57,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
5657

5758
@Override
5859
public void onDone(boolean withBeginTransaction) {}
60+
61+
@Override
62+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
5963
}
6064

6165
public ReadFormatTestRunner(Class<?> clazz) throws InitializationError {

0 commit comments

Comments
 (0)