Skip to content

Commit 3f87f79

Browse files
committed
chore(spanner): support Mutation only case FOR R/W mux
1 parent aeeea3c commit 3f87f79

File tree

8 files changed

+90
-30
lines changed

8 files changed

+90
-30
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import com.google.common.collect.ImmutableList;
2323
import com.google.protobuf.ListValue;
2424
import java.io.Serializable;
25+
import java.util.ArrayList;
2526
import java.util.Collections;
2627
import java.util.HashSet;
2728
import java.util.LinkedHashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
3132
import java.util.Set;
33+
import java.util.concurrent.ThreadLocalRandom;
3234
import javax.annotation.Nullable;
3335

3436
/**
@@ -402,20 +404,36 @@ private boolean isFloat32NaN(Value value) {
402404
return value.getType().equals(Type.float32()) && Float.isNaN(value.getFloat32());
403405
}
404406

405-
static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
407+
// Converts Mutation and returns a random Mutation from the available mutation list based on the
408+
// following heuristics:
409+
// 1. Prefer mutations other than INSERT, since INSERT mutations may contain autogenerated columns
410+
// whose information is unavailable on the client.
411+
// 2. If the list only contains INSERT mutations, select the one with the highest number of
412+
// values.
413+
static com.google.spanner.v1.Mutation toProtoGetRandomMutation(
414+
Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
406415
Mutation last = null;
407416
// The mutation currently being built.
408417
com.google.spanner.v1.Mutation.Builder proto = null;
409418
// The "write" (!= DELETE) or "keySet" (==DELETE) for the last mutation encoded, for coalescing.
410419
com.google.spanner.v1.Mutation.Write.Builder write = null;
411420
com.google.spanner.v1.KeySet.Builder keySet = null;
421+
422+
// Store all the mutations exclusing INSERT.
423+
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert = new ArrayList<>();
424+
// Stores INSERT mutation with large number of values.
425+
com.google.spanner.v1.Mutation largeInsertMutation =
426+
com.google.spanner.v1.Mutation.getDefaultInstance();
427+
412428
for (Mutation mutation : mutations) {
413429
if (mutation.operation == Op.DELETE) {
414430
if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) {
415431
mutation.keySet.appendToProto(keySet);
416432
} else {
417433
if (proto != null) {
418-
out.add(proto.build());
434+
com.google.spanner.v1.Mutation builtMutation = proto.build();
435+
out.add(builtMutation);
436+
allMutationsExcludingInsert.add(builtMutation);
419437
}
420438
proto = com.google.spanner.v1.Mutation.newBuilder();
421439
com.google.spanner.v1.Mutation.Delete.Builder delete =
@@ -437,7 +455,16 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
437455
write.addValues(values);
438456
} else {
439457
if (proto != null) {
440-
out.add(proto.build());
458+
com.google.spanner.v1.Mutation builtMutation = proto.build();
459+
out.add(builtMutation);
460+
if (builtMutation.hasInsert()) {
461+
if (builtMutation.getInsert().getValuesCount()
462+
> largeInsertMutation.getInsert().getValuesCount()) {
463+
largeInsertMutation = builtMutation;
464+
}
465+
} else {
466+
allMutationsExcludingInsert.add(builtMutation);
467+
}
441468
}
442469
proto = com.google.spanner.v1.Mutation.newBuilder();
443470
switch (mutation.operation) {
@@ -464,7 +491,24 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
464491
}
465492
// Flush last item.
466493
if (proto != null) {
494+
com.google.spanner.v1.Mutation builtMutation = proto.build();
467495
out.add(proto.build());
496+
if (builtMutation.hasInsert()) {
497+
if (builtMutation.getInsert().getValuesCount()
498+
> largeInsertMutation.getInsert().getValuesCount()) {
499+
largeInsertMutation = builtMutation;
500+
}
501+
} else {
502+
allMutationsExcludingInsert.add(builtMutation);
503+
}
504+
}
505+
506+
// Select a random mutation based on the heuristic.
507+
if (allMutationsExcludingInsert.size() > 0) {
508+
int randomIndex = ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size());
509+
return allMutationsExcludingInsert.get(randomIndex);
510+
} else {
511+
return largeInsertMutation;
468512
}
469513
}
470514
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MutationGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public ImmutableList<Mutation> getMutations() {
4848

4949
static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
5050
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
51-
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
51+
Mutation.toProtoGetRandomMutation(mutationGroup.getMutations(), mutationsProto);
5252
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
5353
}
5454

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
238238
throws SpannerException {
239239
setActive(null);
240240
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
241-
Mutation.toProto(mutations, mutationsProto);
241+
Mutation.toProtoGetRandomMutation(mutations, mutationsProto);
242242
Options options = Options.fromTransactionOptions(transactionOptions);
243243
final CommitRequest.Builder requestBuilder =
244244
CommitRequest.newBuilder()
@@ -431,19 +431,23 @@ public void close() {
431431
}
432432
}
433433

434-
ApiFuture<ByteString> beginTransactionAsync(
434+
ApiFuture<Transaction> beginTransactionAsync(
435435
Options transactionOptions,
436436
boolean routeToLeader,
437437
Map<SpannerRpc.Option, ?> channelHint,
438-
ByteString previousTransactionId) {
439-
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
438+
ByteString previousTransactionId,
439+
com.google.spanner.v1.Mutation mutation) {
440+
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
440441
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
441-
final BeginTransactionRequest request =
442+
BeginTransactionRequest.Builder requestBuilder =
442443
BeginTransactionRequest.newBuilder()
443444
.setSession(getName())
444445
.setOptions(
445-
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
446-
.build();
446+
createReadWriteTransactionOptions(transactionOptions, previousTransactionId));
447+
if (sessionReference.getIsMultiplexed() && mutation != null) {
448+
requestBuilder.setMutationKey(mutation);
449+
}
450+
final BeginTransactionRequest request = requestBuilder.build();
447451
final ApiFuture<Transaction> requestFuture;
448452
try (IScope ignore = tracer.withSpan(span)) {
449453
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
@@ -457,7 +461,7 @@ ApiFuture<ByteString> beginTransactionAsync(
457461
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
458462
}
459463
span.end();
460-
res.set(txn.getId());
464+
res.set(txn);
461465
} catch (ExecutionException e) {
462466
span.setStatus(e);
463467
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ void ensureTxn() {
289289
ApiFuture<Void> ensureTxnAsync() {
290290
final SettableApiFuture<Void> res = SettableApiFuture.create();
291291
if (transactionId == null || isAborted()) {
292-
createTxnAsync(res);
292+
createTxnAsync(res, null);
293293
} else {
294294
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
295295
txnLogger.log(
@@ -301,20 +301,29 @@ ApiFuture<Void> ensureTxnAsync() {
301301
return res;
302302
}
303303

304-
private void createTxnAsync(final SettableApiFuture<Void> res) {
304+
private void createTxnAsync(
305+
final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
305306
span.addAnnotation("Creating Transaction");
306-
final ApiFuture<ByteString> fut =
307+
final ApiFuture<Transaction> fut =
307308
session.beginTransactionAsync(
308-
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
309+
options,
310+
isRouteToLeader(),
311+
getTransactionChannelHint(),
312+
getPreviousTransactionId(),
313+
mutation);
309314
fut.addListener(
310315
() -> {
311316
try {
312-
transactionId = fut.get();
317+
Transaction txn = fut.get();
318+
transactionId = txn.getId();
313319
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
314320
txnLogger.log(
315321
Level.FINER,
316322
"Started transaction {0}",
317323
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
324+
if (txn.hasPrecommitToken()) {
325+
onPrecommitToken(txn.getPrecommitToken());
326+
}
318327
res.set(null);
319328
} catch (ExecutionException e) {
320329
span.addAnnotation(
@@ -357,13 +366,14 @@ ApiFuture<CommitResponse> commitAsync() {
357366
close();
358367

359368
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
369+
com.google.spanner.v1.Mutation randomMutation = null;
360370
synchronized (committingLock) {
361371
if (committing) {
362372
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
363373
}
364374
committing = true;
365375
if (!mutations.isEmpty()) {
366-
Mutation.toProto(mutations, mutationsProto);
376+
randomMutation = Mutation.toProtoGetRandomMutation(mutations, mutationsProto);
367377
}
368378
}
369379
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
@@ -392,7 +402,7 @@ ApiFuture<CommitResponse> commitAsync() {
392402
synchronized (lock) {
393403
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
394404
finishOps = SettableApiFuture.create();
395-
createTxnAsync(finishOps);
405+
createTxnAsync(finishOps, randomMutation);
396406
} else {
397407
finishOps = finishedAsyncOperations;
398408
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MutationGroupTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private Mutation getRandomMutation() {
4444

4545
private BatchWriteRequest.MutationGroup getMutationGroupProto(ImmutableList<Mutation> mutations) {
4646
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
47-
Mutation.toProto(mutations, mutationsProto);
47+
Mutation.toProtoGetRandomMutation(mutations, mutationsProto);
4848
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
4949
}
5050

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MutationTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ public void serializationBasic() {
321321
com.google.spanner.v1.Mutation.getDefaultInstance();
322322
proto.add(existingProto);
323323

324-
Mutation.toProto(mutations, proto);
324+
Mutation.toProtoGetRandomMutation(mutations, proto);
325325

326326
assertThat(proto.size()).isAtLeast(1);
327327
assertThat(proto.get(0)).isSameInstanceAs(existingProto);
@@ -359,7 +359,7 @@ public void toProtoCoalescingChangeOfTable() {
359359
Mutation.newInsertBuilder("T2").set("C").to("V5").build());
360360

361361
List<com.google.spanner.v1.Mutation> proto = new ArrayList<>();
362-
Mutation.toProto(mutations, proto);
362+
Mutation.toProtoGetRandomMutation(mutations, proto);
363363

364364
assertThat(proto.size()).isEqualTo(2);
365365
MatcherAssert.assertThat(
@@ -386,7 +386,7 @@ public void toProtoCoalescingChangeOfOperation() {
386386
Mutation.newUpdateBuilder("T").set("C").to("V5").build());
387387

388388
List<com.google.spanner.v1.Mutation> proto = new ArrayList<>();
389-
Mutation.toProto(mutations, proto);
389+
Mutation.toProtoGetRandomMutation(mutations, proto);
390390

391391
assertThat(proto.size()).isEqualTo(2);
392392
MatcherAssert.assertThat(
@@ -413,7 +413,7 @@ public void toProtoCoalescingChangeOfColumn() {
413413
Mutation.newInsertBuilder("T").set("C2").to("V5").build());
414414

415415
List<com.google.spanner.v1.Mutation> proto = new ArrayList<>();
416-
Mutation.toProto(mutations, proto);
416+
Mutation.toProtoGetRandomMutation(mutations, proto);
417417

418418
assertThat(proto.size()).isEqualTo(2);
419419
MatcherAssert.assertThat(
@@ -439,7 +439,7 @@ public void toProtoCoalescingDelete() {
439439
Mutation.delete("T", KeySet.range(KeyRange.closedClosed(Key.of("kc"), Key.of("kd")))));
440440

441441
List<com.google.spanner.v1.Mutation> proto = new ArrayList<>();
442-
Mutation.toProto(mutations, proto);
442+
Mutation.toProtoGetRandomMutation(mutations, proto);
443443

444444
assertThat(proto.size()).isEqualTo(1);
445445
MatcherAssert.assertThat(
@@ -470,7 +470,7 @@ public void toProtoCoalescingDeleteChanges() {
470470
Mutation.newInsertBuilder("T2").set("C").to("V1").build());
471471

472472
List<com.google.spanner.v1.Mutation> proto = new ArrayList<>();
473-
Mutation.toProto(mutations, proto);
473+
Mutation.toProtoGetRandomMutation(mutations, proto);
474474

475475
assertThat(proto.size()).isEqualTo(4);
476476
MatcherAssert.assertThat(

google-cloud-spanner/src/test/java/com/google/cloud/spanner/PgNumericTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ public void testMutation() {
319319
.toStringArray(null)
320320
.build());
321321
final List<com.google.spanner.v1.Mutation> expectedMutations = new ArrayList<>();
322-
Mutation.toProto(mutations, expectedMutations);
322+
Mutation.toProtoGetRandomMutation(mutations, expectedMutations);
323323

324324
databaseClient
325325
.readWriteTransaction()

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import com.google.spanner.v1.ExecuteSqlRequest;
8888
import com.google.spanner.v1.ResultSetStats;
8989
import com.google.spanner.v1.RollbackRequest;
90+
import com.google.spanner.v1.Transaction;
9091
import io.opencensus.metrics.LabelValue;
9192
import io.opencensus.metrics.MetricRegistry;
9293
import io.opencensus.metrics.Metrics;
@@ -1497,7 +1498,7 @@ public void testSessionNotFoundReadWriteTransaction() {
14971498
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
14981499
when(closedSession.newTransaction(eq(Options.fromTransactionOptions()), any()))
14991500
.thenReturn(closedTransactionContext);
1500-
when(closedSession.beginTransactionAsync(any(), eq(true), any(), any()))
1501+
when(closedSession.beginTransactionAsync(any(), eq(true), any(), any(), any()))
15011502
.thenThrow(sessionNotFound);
15021503
when(closedSession.getTracer()).thenReturn(tracer);
15031504
TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession);
@@ -1513,8 +1514,9 @@ public void testSessionNotFoundReadWriteTransaction() {
15131514
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
15141515
when(openSession.newTransaction(eq(Options.fromTransactionOptions()), any()))
15151516
.thenReturn(openTransactionContext);
1516-
when(openSession.beginTransactionAsync(any(), eq(true), any(), any()))
1517-
.thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn")));
1517+
Transaction txn = Transaction.newBuilder().setId(ByteString.copyFromUtf8("open-txn")).build();
1518+
when(openSession.beginTransactionAsync(any(), eq(true), any(), any(), any()))
1519+
.thenReturn(ApiFutures.immediateFuture(txn));
15181520
when(openSession.getTracer()).thenReturn(tracer);
15191521
TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession);
15201522
openTransactionRunner.setSpan(span);

0 commit comments

Comments
 (0)