Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ListValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -402,20 +404,41 @@ private boolean isFloat32NaN(Value value) {
return value.getType().equals(Type.float32()) && Float.isNaN(value.getFloat32());
}

static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
// Converts Mutation and returns a random Mutation from the available mutation list based on the
// following heuristics:
// 1. Prefer mutations other than INSERT, since INSERT mutations may contain autogenerated columns
// whose information is unavailable on the client.
// 2. If the list only contains INSERT mutations, select the one with the highest number of
// values.
static com.google.spanner.v1.Mutation toProtoGetRandomMutation(
Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
Mutation last = null;
// The mutation currently being built.
com.google.spanner.v1.Mutation.Builder proto = null;
// The "write" (!= DELETE) or "keySet" (==DELETE) for the last mutation encoded, for coalescing.
com.google.spanner.v1.Mutation.Write.Builder write = null;
com.google.spanner.v1.KeySet.Builder keySet = null;

// Store all the mutations excluding INSERT.
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert = new ArrayList<>();
// Stores INSERT mutation with large number of values.
com.google.spanner.v1.Mutation largeInsertMutation =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
com.google.spanner.v1.Mutation largeInsertMutation =
com.google.spanner.v1.Mutation largestInsertMutation =

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One very peculiar, but not impossible, case is that the list of mutations only contains empty insert mutations. This could for example be possible for a table that has a default value for the primary key (or allows null for the primary key), and also either has a default value for all other columns, or allows them to be null. Such a case would lead to this method returning Mutation.getDefaultInstance() as the largest insert mutation.

The heuristics should therefore also be changed to prefer an insert mutation with zero columns and a table name, over an insert mutation with zero columns and no table name. (Any other non-empty property of a real mutation can also be used instead of the table name for this check).

Can we also add a test for this specific scenario? It is typically something that could also backslide in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Knut for pointing out to this edge case.

I have updated the code to handle this case by adding below lines, and added a test for empty insert mutation scenario.

    // If largestInsertMutation is a default instance of Mutation, replace it with the current
    // INSERT mutation, even if it contains zero values.
    if (!largestInsertMutation.hasInsert()) {
      return true;
    }

Talking out loud of all the edge cases,

  1. If mutations are empty, then this logic never gets called.
  2. Table name in a mutation can not be empty.
  3. If there are mutations other than INSERT operation, then once of them is chosen at random.
  4. If only INSERT mutations, then one with largest value is chosen
  5. If only INSERT mutation, with no columns/values set, then this mutation will be chosen as random mutation

Please let me know if there is anything missing.

com.google.spanner.v1.Mutation.getDefaultInstance();

for (Mutation mutation : mutations) {
if (mutation.operation == Op.DELETE) {
if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) {
mutation.keySet.appendToProto(keySet);
} else {
if (proto != null) {
out.add(proto.build());
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(builtMutation);
if (checkIfInsertMutationWithLargeValue(builtMutation, largeInsertMutation)) {
largeInsertMutation = builtMutation;
}
if (!builtMutation.hasInsert()) {
allMutationsExcludingInsert.add(builtMutation);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can skip most of this, as the if statement on line 429 already guarantees that this is a DELETE mutation. So you can just directly add it to allMutationsExcludingInsert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a coalescing mechanism to group similar consecutive mutations together.
As a result of this logic, the if statement ensures that the current mutation selected from the list is a DELETE mutation. However, the proto being built is based on prior iterations and will not necessarily be a DELETE mutation.

For example, consider the following mutation list:

Mutation.newInsertBuilder("T1").set("C").to("V1").build(),
Mutation.delete("T1", Key.of("k1")),
Mutation.newInsertBuilder("T2").set("C").to("V1").build()

In this case, the if condition will match the DELETE operation on line 2, but the proto will be an INSERT mutation from line 1.
We have a test case toProtoCoalescingDeleteChanges to verify this behavior.

}
proto = com.google.spanner.v1.Mutation.newBuilder();
com.google.spanner.v1.Mutation.Delete.Builder delete =
Expand All @@ -437,7 +460,14 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
write.addValues(values);
} else {
if (proto != null) {
out.add(proto.build());
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(builtMutation);
if (checkIfInsertMutationWithLargeValue(builtMutation, largeInsertMutation)) {
largeInsertMutation = builtMutation;
}
if (!builtMutation.hasInsert()) {
allMutationsExcludingInsert.add(builtMutation);
}
}
proto = com.google.spanner.v1.Mutation.newBuilder();
switch (mutation.operation) {
Expand All @@ -464,7 +494,32 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
}
// Flush last item.
if (proto != null) {
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(proto.build());
if (checkIfInsertMutationWithLargeValue(builtMutation, largeInsertMutation)) {
largeInsertMutation = builtMutation;
}
if (!builtMutation.hasInsert()) {
allMutationsExcludingInsert.add(builtMutation);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code block is repeated a couple of times. Could we extract that to a separate method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have extracted them to seperate methods. Please let me know if there are any other better ways.

}

// Select a random mutation based on the heuristic.
if (allMutationsExcludingInsert.size() > 0) {
int randomIndex = ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size());
return allMutationsExcludingInsert.get(randomIndex);
} else {
return largeInsertMutation;
}
}

// Returns true if the input mutation is of type INSERT and has more values than the current
// largest insert mutation.
private static boolean checkIfInsertMutationWithLargeValue(
com.google.spanner.v1.Mutation mutation,
com.google.spanner.v1.Mutation largestInsertMutation) {
return mutation.hasInsert()
&& mutation.getInsert().getValuesCount()
> largestInsertMutation.getInsert().getValuesCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ImmutableList<Mutation> getMutations() {

static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
Mutation.toProtoGetRandomMutation(mutationGroup.getMutations(), mutationsProto);
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
throws SpannerException {
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
Mutation.toProtoGetRandomMutation(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
Expand Down Expand Up @@ -431,19 +431,23 @@ public void close() {
}
}

ApiFuture<ByteString> beginTransactionAsync(
ApiFuture<Transaction> beginTransactionAsync(
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
ByteString previousTransactionId,
com.google.spanner.v1.Mutation mutation) {
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.Builder requestBuilder =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
createReadWriteTransactionOptions(transactionOptions, previousTransactionId));
if (sessionReference.getIsMultiplexed() && mutation != null) {
requestBuilder.setMutationKey(mutation);
}
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
Expand All @@ -457,7 +461,7 @@ ApiFuture<ByteString> beginTransactionAsync(
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
}
span.end();
res.set(txn.getId());
res.set(txn);
} catch (ExecutionException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void ensureTxn() {
ApiFuture<Void> ensureTxnAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
if (transactionId == null || isAborted()) {
createTxnAsync(res);
createTxnAsync(res, null);
} else {
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
txnLogger.log(
Expand All @@ -301,20 +301,29 @@ ApiFuture<Void> ensureTxnAsync() {
return res;
}

private void createTxnAsync(final SettableApiFuture<Void> res) {
private void createTxnAsync(
final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut =
final ApiFuture<Transaction> fut =
session.beginTransactionAsync(
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
options,
isRouteToLeader(),
getTransactionChannelHint(),
getPreviousTransactionId(),
mutation);
fut.addListener(
() -> {
try {
transactionId = fut.get();
Transaction txn = fut.get();
transactionId = txn.getId();
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
txnLogger.log(
Level.FINER,
"Started transaction {0}",
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
if (txn.hasPrecommitToken()) {
onPrecommitToken(txn.getPrecommitToken());
}
res.set(null);
} catch (ExecutionException e) {
span.addAnnotation(
Expand Down Expand Up @@ -357,13 +366,14 @@ ApiFuture<CommitResponse> commitAsync() {
close();

List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
com.google.spanner.v1.Mutation randomMutation = null;
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
committing = true;
if (!mutations.isEmpty()) {
Mutation.toProto(mutations, mutationsProto);
randomMutation = Mutation.toProtoGetRandomMutation(mutations, mutationsProto);
}
}
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
Expand Down Expand Up @@ -392,7 +402,7 @@ ApiFuture<CommitResponse> commitAsync() {
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
createTxnAsync(finishOps);
createTxnAsync(finishOps, randomMutation);
} else {
finishOps = finishedAsyncOperations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
transactionId = null;
break;
case BEGIN:
transactionId = beginTransaction(session, tx.getBegin()).getId();
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
break;
case ID:
Transaction transaction = transactions.get(tx.getId());
Expand Down Expand Up @@ -1883,7 +1883,8 @@ public void beginTransaction(
try {
beginTransactionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Transaction transaction = beginTransaction(session, request.getOptions());
Transaction transaction =
beginTransaction(session, request.getOptions(), request.getMutationKey());
responseObserver.onNext(transaction);
responseObserver.onCompleted();
} catch (StatusRuntimeException t) {
Expand All @@ -1893,12 +1894,19 @@ public void beginTransaction(
}
}

private Transaction beginTransaction(Session session, TransactionOptions options) {
Transaction.Builder builder =
Transaction.newBuilder().setId(generateTransactionName(session.getName()));
private Transaction beginTransaction(
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
ByteString transactionId = generateTransactionName(session.getName());
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
setReadTimestamp(options, builder);
}
if (session.getMultiplexed()
&& options.getModeCase() == ModeCase.READ_WRITE
&& mutationKey != null) {
// Mutation only case in a read-write transaction.
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
}
Transaction transaction = builder.build();
transactions.put(transaction.getId(), transaction);
transactionsStarted.add(transaction.getId());
Expand Down Expand Up @@ -2005,7 +2013,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
session,
TransactionOptions.newBuilder()
.setReadWrite(ReadWrite.getDefaultInstance())
.build());
.build(),
null);
} else if (request.getTransactionId() != null) {
transaction = transactions.get(request.getTransactionId());
Optional<Boolean> aborted =
Expand Down Expand Up @@ -2490,6 +2499,10 @@ Session getSession(String name) {
return null;
}

static MultiplexedSessionPrecommitToken getTransactionPrecommitToken(ByteString transactionId) {
return getPrecommitToken("TransactionPrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) {
return getPrecommitToken("ResultSetPrecommitToken", transactionId);
}
Expand Down
Loading
Loading