Skip to content

Commit 1db06fb

Browse files
authored
Merge branch 'main' into mutation-only-inline-begin
2 parents 7d19004 + 9a5d86b commit 1db06fb

File tree

6 files changed

+73
-10
lines changed

6 files changed

+73
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
import com.google.api.gax.rpc.ServerStream;
2019
import com.google.cloud.Timestamp;
21-
import com.google.cloud.spanner.Options.TransactionOption;
22-
import com.google.spanner.v1.BatchWriteResponse;
2320

2421
/**
2522
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
@@ -43,11 +40,4 @@ public String getDatabaseRole() {
4340
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
4441
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
4542
}
46-
47-
@Override
48-
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
49-
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
50-
throws SpannerException {
51-
throw new UnsupportedOperationException();
52-
}
5343
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
189189
throws SpannerException {
190190
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
191191
try (IScope s = tracer.withSpan(span)) {
192+
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
193+
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
194+
}
192195
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
193196
} catch (RuntimeException e) {
194197
span.setStatus(e);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23+
import com.google.api.gax.rpc.ServerStream;
2324
import com.google.cloud.Timestamp;
2425
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
2526
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
2627
import com.google.cloud.spanner.Options.TransactionOption;
2728
import com.google.cloud.spanner.Options.UpdateOption;
2829
import com.google.common.util.concurrent.MoreExecutors;
30+
import com.google.spanner.v1.BatchWriteResponse;
2931
import java.util.concurrent.ExecutionException;
3032

3133
/**
@@ -164,6 +166,22 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
164166
}
165167
}
166168

169+
/**
170+
* This is a blocking method, as the interface that it implements is also defined as a blocking
171+
* method.
172+
*/
173+
@Override
174+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
175+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
176+
throws SpannerException {
177+
SessionReference sessionReference = getSessionReference();
178+
try (MultiplexedSessionTransaction transaction =
179+
new MultiplexedSessionTransaction(
180+
client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)) {
181+
return transaction.batchWriteAtLeastOnce(mutationGroups, options);
182+
}
183+
}
184+
167185
@Override
168186
public TransactionRunner readWriteTransaction(TransactionOption... options) {
169187
return new DelayedTransactionRunner(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
2424
import com.google.api.core.SettableApiFuture;
25+
import com.google.api.gax.rpc.ServerStream;
2526
import com.google.cloud.Timestamp;
2627
import com.google.cloud.spanner.Options.TransactionOption;
2728
import com.google.cloud.spanner.Options.UpdateOption;
@@ -30,6 +31,7 @@
3031
import com.google.common.annotations.VisibleForTesting;
3132
import com.google.common.base.Preconditions;
3233
import com.google.common.util.concurrent.MoreExecutors;
34+
import com.google.spanner.v1.BatchWriteResponse;
3335
import com.google.spanner.v1.BeginTransactionRequest;
3436
import com.google.spanner.v1.RequestOptions;
3537
import com.google.spanner.v1.Transaction;
@@ -505,6 +507,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
505507
.writeAtLeastOnceWithOptions(mutations, options);
506508
}
507509

510+
@Override
511+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
512+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
513+
throws SpannerException {
514+
return createMultiplexedSessionTransaction(/* singleUse = */ true)
515+
.batchWriteAtLeastOnce(mutationGroups, options);
516+
}
517+
508518
@Override
509519
public ReadContext singleUse() {
510520
return createMultiplexedSessionTransaction(/* singleUse = */ true).singleUse();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
321321
throw SpannerExceptionFactory.newSpannerException(e);
322322
} finally {
323323
span.end();
324+
onTransactionDone();
324325
}
325326
}
326327

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import com.google.api.core.ApiFuture;
3232
import com.google.api.core.ApiFutures;
33+
import com.google.api.gax.rpc.ServerStream;
3334
import com.google.cloud.NoCredentials;
3435
import com.google.cloud.Timestamp;
3536
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep;
@@ -45,6 +46,8 @@
4546
import com.google.common.collect.Lists;
4647
import com.google.common.util.concurrent.MoreExecutors;
4748
import com.google.protobuf.ByteString;
49+
import com.google.spanner.v1.BatchWriteRequest;
50+
import com.google.spanner.v1.BatchWriteResponse;
4851
import com.google.spanner.v1.BeginTransactionRequest;
4952
import com.google.spanner.v1.CommitRequest;
5053
import com.google.spanner.v1.ExecuteSqlRequest;
@@ -1833,6 +1836,44 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
18331836
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
18341837
}
18351838

1839+
@Test
1840+
public void testBatchWriteAtLeastOnce() {
1841+
DatabaseClientImpl client =
1842+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1843+
1844+
Iterable<MutationGroup> MUTATION_GROUPS =
1845+
ImmutableList.of(
1846+
MutationGroup.of(
1847+
Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
1848+
Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
1849+
MutationGroup.of(
1850+
Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
1851+
Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()));
1852+
1853+
ServerStream<BatchWriteResponse> responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS);
1854+
int idx = 0;
1855+
for (BatchWriteResponse response : responseStream) {
1856+
assertEquals(
1857+
response.getStatus(),
1858+
com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build());
1859+
assertEquals(response.getIndexesList(), ImmutableList.of(idx, idx + 1));
1860+
idx += 2;
1861+
}
1862+
1863+
assertNotNull(responseStream);
1864+
List<BatchWriteRequest> requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class);
1865+
assertEquals(requests.size(), 1);
1866+
BatchWriteRequest request = requests.get(0);
1867+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
1868+
assertEquals(request.getMutationGroupsCount(), 2);
1869+
assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED);
1870+
assertFalse(request.getExcludeTxnFromChangeStreams());
1871+
1872+
assertNotNull(client.multiplexedSessionDatabaseClient);
1873+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
1874+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
1875+
}
1876+
18361877
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
18371878
assertNotNull(client.multiplexedSessionDatabaseClient);
18381879
SessionReference sessionReference =

0 commit comments

Comments
 (0)