Skip to content

Commit 59879af

Browse files
committed
chore(spanner): add multiplexed session support for batch write
1 parent 8822e3b commit 59879af

File tree

6 files changed

+71
-10
lines changed

6 files changed

+71
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
import com.google.api.gax.rpc.ServerStream;
2019
import com.google.cloud.Timestamp;
21-
import com.google.cloud.spanner.Options.TransactionOption;
2220
import com.google.cloud.spanner.Options.UpdateOption;
23-
import com.google.spanner.v1.BatchWriteResponse;
2421

2522
/**
2623
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
@@ -45,13 +42,6 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
4542
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
4643
}
4744

48-
@Override
49-
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
50-
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
51-
throws SpannerException {
52-
throw new UnsupportedOperationException();
53-
}
54-
5545
@Override
5646
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
5747
throw new UnsupportedOperationException();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
178178
throws SpannerException {
179179
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
180180
try (IScope s = tracer.withSpan(span)) {
181+
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
182+
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
183+
}
181184
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
182185
} catch (RuntimeException e) {
183186
span.setStatus(e);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23+
import com.google.api.gax.rpc.ServerStream;
2324
import com.google.cloud.Timestamp;
2425
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
2526
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
2627
import com.google.cloud.spanner.Options.TransactionOption;
2728
import com.google.common.util.concurrent.MoreExecutors;
29+
import com.google.spanner.v1.BatchWriteResponse;
2830
import java.util.concurrent.ExecutionException;
2931

3032
/**
@@ -163,6 +165,20 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
163165
}
164166
}
165167

168+
// This is a blocking method, as the interface that it implements is also defined as a blocking
169+
// method.
170+
@Override
171+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
172+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
173+
throws SpannerException {
174+
SessionReference sessionReference = getSessionReference();
175+
try (MultiplexedSessionTransaction transaction =
176+
new MultiplexedSessionTransaction(
177+
client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)) {
178+
return transaction.batchWriteAtLeastOnce(mutationGroups, options);
179+
}
180+
}
181+
166182
@Override
167183
public TransactionRunner readWriteTransaction(TransactionOption... options) {
168184
return new DelayedTransactionRunner(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
2424
import com.google.api.core.SettableApiFuture;
25+
import com.google.api.gax.rpc.ServerStream;
2526
import com.google.cloud.Timestamp;
2627
import com.google.cloud.spanner.Options.TransactionOption;
2728
import com.google.cloud.spanner.SessionClient.SessionConsumer;
2829
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
2930
import com.google.common.annotations.VisibleForTesting;
3031
import com.google.common.base.Preconditions;
3132
import com.google.common.util.concurrent.MoreExecutors;
33+
import com.google.spanner.v1.BatchWriteResponse;
3234
import com.google.spanner.v1.BeginTransactionRequest;
3335
import com.google.spanner.v1.RequestOptions;
3436
import com.google.spanner.v1.Transaction;
@@ -499,6 +501,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
499501
.writeAtLeastOnceWithOptions(mutations, options);
500502
}
501503

504+
@Override
505+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
506+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
507+
throws SpannerException {
508+
return createMultiplexedSessionTransaction(/* singleUse = */ true)
509+
.batchWriteAtLeastOnce(mutationGroups, options);
510+
}
511+
502512
@Override
503513
public ReadContext singleUse() {
504514
return createMultiplexedSessionTransaction(/* singleUse = */ true).singleUse();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
321321
throw SpannerExceptionFactory.newSpannerException(e);
322322
} finally {
323323
span.end();
324+
onTransactionDone();
324325
}
325326
}
326327

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import com.google.api.core.ApiFuture;
3232
import com.google.api.core.ApiFutures;
33+
import com.google.api.gax.rpc.ServerStream;
3334
import com.google.cloud.NoCredentials;
3435
import com.google.cloud.Timestamp;
3536
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep;
@@ -45,6 +46,8 @@
4546
import com.google.common.collect.Lists;
4647
import com.google.common.util.concurrent.MoreExecutors;
4748
import com.google.protobuf.ByteString;
49+
import com.google.spanner.v1.BatchWriteRequest;
50+
import com.google.spanner.v1.BatchWriteResponse;
4851
import com.google.spanner.v1.BeginTransactionRequest;
4952
import com.google.spanner.v1.CommitRequest;
5053
import com.google.spanner.v1.ExecuteSqlRequest;
@@ -1635,6 +1638,44 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
16351638
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
16361639
}
16371640

1641+
@Test
1642+
public void testBatchWriteAtLeastOnce() {
1643+
DatabaseClientImpl client =
1644+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1645+
1646+
Iterable<MutationGroup> MUTATION_GROUPS =
1647+
ImmutableList.of(
1648+
MutationGroup.of(
1649+
Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
1650+
Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
1651+
MutationGroup.of(
1652+
Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
1653+
Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()));
1654+
1655+
ServerStream<BatchWriteResponse> responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS);
1656+
int idx = 0;
1657+
for (BatchWriteResponse response : responseStream) {
1658+
assertEquals(
1659+
response.getStatus(),
1660+
com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build());
1661+
assertEquals(response.getIndexesList(), ImmutableList.of(idx, idx + 1));
1662+
idx += 2;
1663+
}
1664+
1665+
assertNotNull(responseStream);
1666+
List<BatchWriteRequest> requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class);
1667+
assertEquals(requests.size(), 1);
1668+
BatchWriteRequest request = requests.get(0);
1669+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
1670+
assertEquals(request.getMutationGroupsCount(), 2);
1671+
assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED);
1672+
assertFalse(request.getExcludeTxnFromChangeStreams());
1673+
1674+
assertNotNull(client.multiplexedSessionDatabaseClient);
1675+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
1676+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
1677+
}
1678+
16381679
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
16391680
assertNotNull(client.multiplexedSessionDatabaseClient);
16401681
SessionReference sessionReference =

0 commit comments

Comments
 (0)