Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ListValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -402,20 +404,43 @@ private boolean isFloat32NaN(Value value) {
return value.getType().equals(Type.float32()) && Float.isNaN(value.getFloat32());
}

static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
/**
* Converts the list of mutations to the corresponding protobuf mutations and returns a random
* mutation from the available list based on the following heuristics:
*
* <p>1. Prefer mutations other than INSERT, as INSERT mutations may contain autogenerated columns
* whose information is unavailable on the client.
*
* <p>2. If the list only contains INSERT mutations, select the one with the highest number of
* values.
*/
static com.google.spanner.v1.Mutation toProtoAndReturnRandomMutation(
Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
Mutation last = null;
// The mutation currently being built.
com.google.spanner.v1.Mutation.Builder proto = null;
// The "write" (!= DELETE) or "keySet" (==DELETE) for the last mutation encoded, for coalescing.
com.google.spanner.v1.Mutation.Write.Builder write = null;
com.google.spanner.v1.KeySet.Builder keySet = null;

// Stores all the mutations excluding INSERT mutations.
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert = new ArrayList<>();
// Stores the INSERT mutation with largest number of values.
com.google.spanner.v1.Mutation largestInsertMutation =
com.google.spanner.v1.Mutation.getDefaultInstance();

for (Mutation mutation : mutations) {
if (mutation.operation == Op.DELETE) {
if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) {
mutation.keySet.appendToProto(keySet);
} else {
if (proto != null) {
out.add(proto.build());
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(builtMutation);
if (checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
largestInsertMutation = builtMutation;
}
addMutationsExcludingInsert(builtMutation, allMutationsExcludingInsert);
}
proto = com.google.spanner.v1.Mutation.newBuilder();
com.google.spanner.v1.Mutation.Delete.Builder delete =
Expand All @@ -437,7 +462,12 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
write.addValues(values);
} else {
if (proto != null) {
out.add(proto.build());
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(builtMutation);
if (checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
largestInsertMutation = builtMutation;
}
addMutationsExcludingInsert(builtMutation, allMutationsExcludingInsert);
}
proto = com.google.spanner.v1.Mutation.newBuilder();
switch (mutation.operation) {
Expand All @@ -464,7 +494,44 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
}
// Flush last item.
if (proto != null) {
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(proto.build());
if (checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
largestInsertMutation = builtMutation;
}
addMutationsExcludingInsert(builtMutation, allMutationsExcludingInsert);
}

// Select a random mutation based on the heuristic.
if (!allMutationsExcludingInsert.isEmpty()) {
return allMutationsExcludingInsert.get(
ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size()));
} else {
return largestInsertMutation;
}
}

// Returns true if the input mutation is of type INSERT and has more values than the current
// largest insert mutation.
private static boolean checkIfInsertMutationWithLargeValue(
com.google.spanner.v1.Mutation mutation,
com.google.spanner.v1.Mutation largestInsertMutation) {
// If largestInsertMutation is a default instance of Mutation, replace it with the current
// INSERT mutation, even if it contains zero values.
if (!largestInsertMutation.hasInsert()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should be:

Suggested change
if (!largestInsertMutation.hasInsert()) {
if (mutation.hasInsert() && !largestInsertMutation.hasInsert()) {

Please add a test that fails with the current implementation and succeeds with the above change (unless I missed something here, and the current implementation is correct).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this won't fail, as there are only two possibilities:

  1. There are only insert mutations. Then the largest insert mutation will be used.
  2. There is at least one non-insert mutation. Although it is possible that that mutation is set as 'the largest insert mutation' with the current if statement, it would not really make any difference, as the largest insert mutation won't be used anyways.

We should still fix the above if condition, though, as the current implementation is a bit confusing.

There is however another (small) optimization possible here; Once allMutationsExcludingInserts.isEmpty() returns false, you can skip the tracking of the largest insert mutation all-together, as it won't be used anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Thanks for pointing this out. It does not affect the result, but it is indeed a flaw. Updated the code.
  2. Added the optimization.

return true;
}
return mutation.hasInsert()
&& mutation.getInsert().getValuesCount()
> largestInsertMutation.getInsert().getValuesCount();
}

// Stores all mutations that are not of type INSERT.
private static void addMutationsExcludingInsert(
com.google.spanner.v1.Mutation mutation,
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert) {
if (!mutation.hasInsert()) {
allMutationsExcludingInsert.add(mutation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ImmutableList<Mutation> getMutations() {

static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
Mutation.toProtoAndReturnRandomMutation(mutationGroup.getMutations(), mutationsProto);
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
throws SpannerException {
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
Expand Down Expand Up @@ -431,19 +431,23 @@ public void close() {
}
}

ApiFuture<ByteString> beginTransactionAsync(
ApiFuture<Transaction> beginTransactionAsync(
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
ByteString previousTransactionId,
com.google.spanner.v1.Mutation mutation) {
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.Builder requestBuilder =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
createReadWriteTransactionOptions(transactionOptions, previousTransactionId));
if (sessionReference.getIsMultiplexed() && mutation != null) {
requestBuilder.setMutationKey(mutation);
}
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
Expand All @@ -457,7 +461,7 @@ ApiFuture<ByteString> beginTransactionAsync(
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
}
span.end();
res.set(txn.getId());
res.set(txn);
} catch (ExecutionException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void ensureTxn() {
ApiFuture<Void> ensureTxnAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
if (transactionId == null || isAborted()) {
createTxnAsync(res);
createTxnAsync(res, null);
} else {
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
txnLogger.log(
Expand All @@ -301,20 +301,29 @@ ApiFuture<Void> ensureTxnAsync() {
return res;
}

private void createTxnAsync(final SettableApiFuture<Void> res) {
private void createTxnAsync(
final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut =
final ApiFuture<Transaction> fut =
session.beginTransactionAsync(
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
options,
isRouteToLeader(),
getTransactionChannelHint(),
getPreviousTransactionId(),
mutation);
fut.addListener(
() -> {
try {
transactionId = fut.get();
Transaction txn = fut.get();
transactionId = txn.getId();
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
txnLogger.log(
Level.FINER,
"Started transaction {0}",
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
if (txn.hasPrecommitToken()) {
onPrecommitToken(txn.getPrecommitToken());
}
res.set(null);
} catch (ExecutionException e) {
span.addAnnotation(
Expand Down Expand Up @@ -357,13 +366,14 @@ ApiFuture<CommitResponse> commitAsync() {
close();

List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
com.google.spanner.v1.Mutation randomMutation = null;
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
committing = true;
if (!mutations.isEmpty()) {
Mutation.toProto(mutations, mutationsProto);
randomMutation = Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
}
}
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
Expand Down Expand Up @@ -392,7 +402,7 @@ ApiFuture<CommitResponse> commitAsync() {
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
createTxnAsync(finishOps);
createTxnAsync(finishOps, randomMutation);
} else {
finishOps = finishedAsyncOperations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
transactionId = null;
break;
case BEGIN:
transactionId = beginTransaction(session, tx.getBegin()).getId();
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
break;
case ID:
Transaction transaction = transactions.get(tx.getId());
Expand Down Expand Up @@ -1883,7 +1883,8 @@ public void beginTransaction(
try {
beginTransactionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Transaction transaction = beginTransaction(session, request.getOptions());
Transaction transaction =
beginTransaction(session, request.getOptions(), request.getMutationKey());
responseObserver.onNext(transaction);
responseObserver.onCompleted();
} catch (StatusRuntimeException t) {
Expand All @@ -1893,12 +1894,19 @@ public void beginTransaction(
}
}

private Transaction beginTransaction(Session session, TransactionOptions options) {
Transaction.Builder builder =
Transaction.newBuilder().setId(generateTransactionName(session.getName()));
private Transaction beginTransaction(
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
ByteString transactionId = generateTransactionName(session.getName());
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
setReadTimestamp(options, builder);
}
if (session.getMultiplexed()
&& options.getModeCase() == ModeCase.READ_WRITE
&& mutationKey != null) {
// Mutation only case in a read-write transaction.
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
}
Transaction transaction = builder.build();
transactions.put(transaction.getId(), transaction);
transactionsStarted.add(transaction.getId());
Expand Down Expand Up @@ -2005,7 +2013,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
session,
TransactionOptions.newBuilder()
.setReadWrite(ReadWrite.getDefaultInstance())
.build());
.build(),
null);
} else if (request.getTransactionId() != null) {
transaction = transactions.get(request.getTransactionId());
Optional<Boolean> aborted =
Expand Down Expand Up @@ -2490,6 +2499,10 @@ Session getSession(String name) {
return null;
}

static MultiplexedSessionPrecommitToken getTransactionPrecommitToken(ByteString transactionId) {
return getPrecommitToken("TransactionPrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) {
return getPrecommitToken("ResultSetPrecommitToken", transactionId);
}
Expand Down
Loading
Loading