Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.LazyInit;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.TransactionState;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.exception.storage.ExecutionException;
Expand Down Expand Up @@ -36,6 +36,7 @@ public class CommitHandler {
protected final Coordinator coordinator;
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;
private final MutationsGrouper mutationsGrouper;
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;

@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;
Expand All @@ -46,11 +47,13 @@ public CommitHandler(
Coordinator coordinator,
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
MutationsGrouper mutationsGrouper,
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
this.storage = checkNotNull(storage);
this.coordinator = checkNotNull(coordinator);
this.tableMetadataManager = checkNotNull(tableMetadataManager);
this.parallelExecutor = checkNotNull(parallelExecutor);
this.mutationsGrouper = checkNotNull(mutationsGrouper);
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
}

Expand Down Expand Up @@ -199,12 +202,11 @@ public void prepareRecords(Snapshot snapshot) throws PreparationException {
PrepareMutationComposer composer =
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
snapshot.to(composer);
PartitionedMutations mutations = new PartitionedMutations(composer.get());
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the mutations for prepare-records.


ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
for (PartitionedMutations.Key key : orderedKeys) {
tasks.add(() -> storage.mutate(mutations.get(key)));
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
for (List<Mutation> mutations : groupedMutations) {
tasks.add(() -> storage.mutate(mutations));
}
parallelExecutor.prepareRecords(tasks, snapshot.getId());
} catch (NoMutationException e) {
Expand Down Expand Up @@ -252,12 +254,11 @@ public void commitRecords(Snapshot snapshot) {
CommitMutationComposer composer =
new CommitMutationComposer(snapshot.getId(), tableMetadataManager);
snapshot.to(composer);
PartitionedMutations mutations = new PartitionedMutations(composer.get());
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the mutations for commit-records.


ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
for (PartitionedMutations.Key key : orderedKeys) {
tasks.add(() -> storage.mutate(mutations.get(key)));
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
for (List<Mutation> mutations : groupedMutations) {
tasks.add(() -> storage.mutate(mutations));
}
parallelExecutor.commitRecords(tasks, snapshot.getId());
} catch (Exception e) {
Expand Down Expand Up @@ -300,12 +301,11 @@ public void rollbackRecords(Snapshot snapshot) {
RollbackMutationComposer composer =
new RollbackMutationComposer(snapshot.getId(), storage, tableMetadataManager);
snapshot.to(composer);
PartitionedMutations mutations = new PartitionedMutations(composer.get());
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the mutations for rollback-records.


ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
for (PartitionedMutations.Key key : orderedKeys) {
tasks.add(() -> storage.mutate(mutations.get(key)));
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
for (List<Mutation> mutations : groupedMutations) {
tasks.add(() -> storage.mutate(mutations));
}
parallelExecutor.rollbackRecords(tasks, snapshot.getId());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ public CommitHandlerWithGroupCommit(
Coordinator coordinator,
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
MutationsGrouper mutationsGrouper,
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
CoordinatorGroupCommitter groupCommitter) {
super(
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
coordinatorWriteOmissionOnReadOnlyEnabled);

checkNotNull(groupCommitter);
// The methods of this emitter will be called via GroupCommitter.ready().
groupCommitter.setEmitter(new Emitter(coordinator));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.scalar.db.common.AbstractDistributedTransactionManager;
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
import com.scalar.db.common.ReadOnlyDistributedTransaction;
import com.scalar.db.common.StorageInfoProvider;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CrudConflictException;
Expand Down Expand Up @@ -133,12 +134,14 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {

// `groupCommitter` must be set before calling this method.
private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin));
if (isGroupCommitEnabled()) {
return new CommitHandlerWithGroupCommit(
storage,
coordinator,
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
groupCommitter);
} else {
Expand All @@ -147,6 +150,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
coordinator,
tableMetadataManager,
parallelExecutor,
mutationsGrouper,
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package com.scalar.db.transaction.consensuscommit;

import com.scalar.db.api.Mutation;
import com.scalar.db.api.StorageInfo;
import com.scalar.db.common.StorageInfoProvider;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Key;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class MutationsGrouper {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added MutationsGrouper that groups the mutations by based on the storage’s mutation atomicity unit.


private final StorageInfoProvider storageInfoProvider;

public MutationsGrouper(StorageInfoProvider storageInfoProvider) {
this.storageInfoProvider = storageInfoProvider;
}

public List<List<Mutation>> groupMutations(Collection<Mutation> mutations)
throws ExecutionException {
// MutationGroup mutations by their storage info and atomicity unit
List<MutationGroup> groups = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This can be removed by using groupToBatches.keySet() instead?

Map<MutationGroup, List<List<Mutation>>> groupToBatches = new LinkedHashMap<>();

for (Mutation mutation : mutations) {
assert mutation.forNamespace().isPresent();
StorageInfo storageInfo = storageInfoProvider.getStorageInfo(mutation.forNamespace().get());

MutationGroup matchedGroup = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] If MutationGroup.equals() has a similar logic to isSameGroup(), maybe these lines can be simplified like this?

      MutationGroup group = new MutationGroup(mutation, storageInfo);

      List<List<Mutation>> batches =
          groupToBatches.computeIfAbsent(group, g -> new ArrayList<>());
      int maxCount = group.storageInfo.getMaxAtomicMutationsCount();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Fixed in b44e397. Thanks!

for (MutationGroup group : groups) {
if (group.isSameGroup(mutation, storageInfo)) {
matchedGroup = group;
break;
}
}
if (matchedGroup == null) {
// If no matching group is found, create a new one
matchedGroup = new MutationGroup(mutation, storageInfo);
groups.add(matchedGroup);
}

List<List<Mutation>> batches =
groupToBatches.computeIfAbsent(matchedGroup, g -> new ArrayList<>());
int maxCount = matchedGroup.storageInfo.getMaxAtomicMutationsCount();

if (batches.isEmpty() || batches.get(batches.size() - 1).size() >= maxCount) {
// If the last batch is full or there are no batches yet, create a new batch
batches.add(new ArrayList<>());
}

batches.get(batches.size() - 1).add(mutation);
}

// Flatten the grouped mutations into a single list of batches
return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList());
}

private static class MutationGroup {
public final StorageInfo storageInfo;
@Nullable public final String namespace;
@Nullable public final String table;
@Nullable public final Key partitionKey;
@Nullable public final Optional<Key> clusteringKey;

public MutationGroup(Mutation mutation, StorageInfo storageInfo) {
assert mutation.forNamespace().isPresent() && mutation.forTable().isPresent();

switch (storageInfo.getMutationAtomicityUnit()) {
case RECORD:
this.clusteringKey = mutation.getClusteringKey();
this.partitionKey = mutation.getPartitionKey();
this.table = mutation.forTable().get();
this.namespace = mutation.forNamespace().get();
this.storageInfo = storageInfo;
break;
case PARTITION:
this.clusteringKey = null;
this.partitionKey = mutation.getPartitionKey();
this.table = mutation.forTable().get();
this.namespace = mutation.forNamespace().get();
this.storageInfo = storageInfo;
break;
case TABLE:
this.clusteringKey = null;
this.partitionKey = null;
this.table = mutation.forTable().get();
this.namespace = mutation.forNamespace().get();
this.storageInfo = storageInfo;
break;
case NAMESPACE:
this.clusteringKey = null;
this.partitionKey = null;
this.table = null;
this.namespace = mutation.forNamespace().get();
this.storageInfo = storageInfo;
break;
case STORAGE:
this.clusteringKey = null;
this.partitionKey = null;
this.table = null;
this.namespace = null;
this.storageInfo = storageInfo;
break;
default:
throw new AssertionError(
"Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit());
}
}

boolean isSameGroup(Mutation otherMutation, StorageInfo otherStorageInfo) {
assert otherMutation.forNamespace().isPresent() && otherMutation.forTable().isPresent();

switch (storageInfo.getMutationAtomicityUnit()) {
case RECORD:
if (!otherMutation.getClusteringKey().equals(this.clusteringKey)) {
return false;
}
// Fall through
case PARTITION:
if (!otherMutation.getPartitionKey().equals(this.partitionKey)) {
return false;
}
// Fall through
case TABLE:
if (!otherMutation.forTable().get().equals(this.table)) {
return false;
}
// Fall through
case NAMESPACE:
if (!otherMutation.forNamespace().get().equals(this.namespace)) {
return false;
}
// Fall through
case STORAGE:
if (!otherStorageInfo.getStorageName().equals(this.storageInfo.getStorageName())) {
return false;
}
break;
default:
throw new AssertionError(
"Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit());
}

return true;
}
}
}

This file was deleted.

Loading