-
Notifications
You must be signed in to change notification settings - Fork 40
Optimize mutations based on the storage’s mutation atomicity unit in Consensus Commit #2807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,9 +2,9 @@ | |
|
|
||
| import static com.google.common.base.Preconditions.checkNotNull; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.errorprone.annotations.concurrent.LazyInit; | ||
| import com.scalar.db.api.DistributedStorage; | ||
| import com.scalar.db.api.Mutation; | ||
| import com.scalar.db.api.TransactionState; | ||
| import com.scalar.db.common.CoreError; | ||
| import com.scalar.db.exception.storage.ExecutionException; | ||
|
|
@@ -36,6 +36,7 @@ public class CommitHandler { | |
| protected final Coordinator coordinator; | ||
| private final TransactionTableMetadataManager tableMetadataManager; | ||
| private final ParallelExecutor parallelExecutor; | ||
| private final MutationsGrouper mutationsGrouper; | ||
| protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled; | ||
|
|
||
| @LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook; | ||
|
|
@@ -46,11 +47,13 @@ public CommitHandler( | |
| Coordinator coordinator, | ||
| TransactionTableMetadataManager tableMetadataManager, | ||
| ParallelExecutor parallelExecutor, | ||
| MutationsGrouper mutationsGrouper, | ||
| boolean coordinatorWriteOmissionOnReadOnlyEnabled) { | ||
| this.storage = checkNotNull(storage); | ||
| this.coordinator = checkNotNull(coordinator); | ||
| this.tableMetadataManager = checkNotNull(tableMetadataManager); | ||
| this.parallelExecutor = checkNotNull(parallelExecutor); | ||
| this.mutationsGrouper = checkNotNull(mutationsGrouper); | ||
| this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled; | ||
| } | ||
|
|
||
|
|
@@ -199,12 +202,11 @@ public void prepareRecords(Snapshot snapshot) throws PreparationException { | |
| PrepareMutationComposer composer = | ||
| new PrepareMutationComposer(snapshot.getId(), tableMetadataManager); | ||
| snapshot.to(composer); | ||
| PartitionedMutations mutations = new PartitionedMutations(composer.get()); | ||
| List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get()); | ||
|
|
||
| ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys(); | ||
| List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size()); | ||
| for (PartitionedMutations.Key key : orderedKeys) { | ||
| tasks.add(() -> storage.mutate(mutations.get(key))); | ||
| List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size()); | ||
| for (List<Mutation> mutations : groupedMutations) { | ||
| tasks.add(() -> storage.mutate(mutations)); | ||
| } | ||
| parallelExecutor.prepareRecords(tasks, snapshot.getId()); | ||
| } catch (NoMutationException e) { | ||
|
|
@@ -252,12 +254,11 @@ public void commitRecords(Snapshot snapshot) { | |
| CommitMutationComposer composer = | ||
| new CommitMutationComposer(snapshot.getId(), tableMetadataManager); | ||
| snapshot.to(composer); | ||
| PartitionedMutations mutations = new PartitionedMutations(composer.get()); | ||
| List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get()); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Split the mutations for commit-records. |
||
|
|
||
| ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys(); | ||
| List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size()); | ||
| for (PartitionedMutations.Key key : orderedKeys) { | ||
| tasks.add(() -> storage.mutate(mutations.get(key))); | ||
| List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size()); | ||
| for (List<Mutation> mutations : groupedMutations) { | ||
| tasks.add(() -> storage.mutate(mutations)); | ||
| } | ||
| parallelExecutor.commitRecords(tasks, snapshot.getId()); | ||
| } catch (Exception e) { | ||
|
|
@@ -300,12 +301,11 @@ public void rollbackRecords(Snapshot snapshot) { | |
| RollbackMutationComposer composer = | ||
| new RollbackMutationComposer(snapshot.getId(), storage, tableMetadataManager); | ||
| snapshot.to(composer); | ||
| PartitionedMutations mutations = new PartitionedMutations(composer.get()); | ||
| List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get()); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Split the mutations for rollback-records. |
||
|
|
||
| ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys(); | ||
| List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size()); | ||
| for (PartitionedMutations.Key key : orderedKeys) { | ||
| tasks.add(() -> storage.mutate(mutations.get(key))); | ||
| List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size()); | ||
| for (List<Mutation> mutations : groupedMutations) { | ||
| tasks.add(() -> storage.mutate(mutations)); | ||
| } | ||
| parallelExecutor.rollbackRecords(tasks, snapshot.getId()); | ||
| } catch (Exception e) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| package com.scalar.db.transaction.consensuscommit; | ||
|
|
||
| import com.scalar.db.api.Mutation; | ||
| import com.scalar.db.api.StorageInfo; | ||
| import com.scalar.db.common.StorageInfoProvider; | ||
| import com.scalar.db.exception.storage.ExecutionException; | ||
| import com.scalar.db.io.Key; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.stream.Collectors; | ||
| import javax.annotation.Nullable; | ||
| import javax.annotation.concurrent.ThreadSafe; | ||
|
|
||
| @ThreadSafe | ||
| public class MutationsGrouper { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
|
||
| private final StorageInfoProvider storageInfoProvider; | ||
|
|
||
| public MutationsGrouper(StorageInfoProvider storageInfoProvider) { | ||
| this.storageInfoProvider = storageInfoProvider; | ||
| } | ||
|
|
||
| public List<List<Mutation>> groupMutations(Collection<Mutation> mutations) | ||
| throws ExecutionException { | ||
| // MutationGroup mutations by their storage info and atomicity unit | ||
| Map<MutationGroup, List<List<Mutation>>> groupToBatches = new LinkedHashMap<>(); | ||
|
|
||
| for (Mutation mutation : mutations) { | ||
| assert mutation.forNamespace().isPresent(); | ||
| StorageInfo storageInfo = storageInfoProvider.getStorageInfo(mutation.forNamespace().get()); | ||
brfrn169 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| MutationGroup group = new MutationGroup(mutation, storageInfo); | ||
| List<List<Mutation>> batches = groupToBatches.computeIfAbsent(group, g -> new ArrayList<>()); | ||
| int maxCount = group.storageInfo.getMaxAtomicMutationsCount(); | ||
|
|
||
| if (batches.isEmpty() || batches.get(batches.size() - 1).size() >= maxCount) { | ||
| // If the last batch is full or there are no batches yet, create a new batch | ||
| batches.add(new ArrayList<>()); | ||
| } | ||
|
|
||
| batches.get(batches.size() - 1).add(mutation); | ||
| } | ||
|
|
||
| // Flatten the grouped mutations into a single list of batches | ||
| return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private static class MutationGroup { | ||
| public final StorageInfo storageInfo; | ||
| @Nullable public final String namespace; | ||
| @Nullable public final String table; | ||
| @Nullable public final Key partitionKey; | ||
| @Nullable public final Optional<Key> clusteringKey; | ||
|
|
||
| private MutationGroup(Mutation mutation, StorageInfo storageInfo) { | ||
| assert mutation.forNamespace().isPresent() && mutation.forTable().isPresent(); | ||
|
|
||
| switch (storageInfo.getMutationAtomicityUnit()) { | ||
| case RECORD: | ||
| this.clusteringKey = mutation.getClusteringKey(); | ||
| this.partitionKey = mutation.getPartitionKey(); | ||
| this.table = mutation.forTable().get(); | ||
| this.namespace = mutation.forNamespace().get(); | ||
| this.storageInfo = storageInfo; | ||
| break; | ||
| case PARTITION: | ||
| this.clusteringKey = null; | ||
| this.partitionKey = mutation.getPartitionKey(); | ||
| this.table = mutation.forTable().get(); | ||
| this.namespace = mutation.forNamespace().get(); | ||
| this.storageInfo = storageInfo; | ||
| break; | ||
| case TABLE: | ||
| this.clusteringKey = null; | ||
| this.partitionKey = null; | ||
| this.table = mutation.forTable().get(); | ||
| this.namespace = mutation.forNamespace().get(); | ||
| this.storageInfo = storageInfo; | ||
| break; | ||
| case NAMESPACE: | ||
| this.clusteringKey = null; | ||
| this.partitionKey = null; | ||
| this.table = null; | ||
| this.namespace = mutation.forNamespace().get(); | ||
| this.storageInfo = storageInfo; | ||
| break; | ||
| case STORAGE: | ||
| this.clusteringKey = null; | ||
| this.partitionKey = null; | ||
| this.table = null; | ||
| this.namespace = null; | ||
| this.storageInfo = storageInfo; | ||
| break; | ||
| default: | ||
| throw new AssertionError( | ||
| "Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (!(o instanceof MutationGroup)) { | ||
| return false; | ||
| } | ||
| MutationGroup that = (MutationGroup) o; | ||
| return Objects.equals(storageInfo.getStorageName(), that.storageInfo.getStorageName()) | ||
| && Objects.equals(namespace, that.namespace) | ||
| && Objects.equals(table, that.table) | ||
| && Objects.equals(partitionKey, that.partitionKey) | ||
| && Objects.equals(clusteringKey, that.clusteringKey); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash( | ||
| storageInfo.getStorageName(), namespace, table, partitionKey, clusteringKey); | ||
| } | ||
| } | ||
| } | ||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the mutations for prepare-records.