Skip to content

Commit 4fd6625

Browse files
committed
Optimize mutations in Consensus Commit based on the storage’s mutation atomicity unit
1 parent daf4d88 commit 4fd6625

File tree

13 files changed

+673
-353
lines changed

13 files changed

+673
-353
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import static com.google.common.base.Preconditions.checkNotNull;
44

5-
import com.google.common.collect.ImmutableList;
65
import com.google.errorprone.annotations.concurrent.LazyInit;
76
import com.scalar.db.api.DistributedStorage;
7+
import com.scalar.db.api.Mutation;
88
import com.scalar.db.api.TransactionState;
99
import com.scalar.db.common.error.CoreError;
1010
import com.scalar.db.exception.storage.ExecutionException;
@@ -36,6 +36,7 @@ public class CommitHandler {
3636
protected final Coordinator coordinator;
3737
private final TransactionTableMetadataManager tableMetadataManager;
3838
private final ParallelExecutor parallelExecutor;
39+
private final MutationsGrouper mutationsGrouper;
3940
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
4041

4142
@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;
@@ -46,11 +47,13 @@ public CommitHandler(
4647
Coordinator coordinator,
4748
TransactionTableMetadataManager tableMetadataManager,
4849
ParallelExecutor parallelExecutor,
50+
MutationsGrouper mutationsGrouper,
4951
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
5052
this.storage = checkNotNull(storage);
5153
this.coordinator = checkNotNull(coordinator);
5254
this.tableMetadataManager = checkNotNull(tableMetadataManager);
5355
this.parallelExecutor = checkNotNull(parallelExecutor);
56+
this.mutationsGrouper = checkNotNull(mutationsGrouper);
5457
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
5558
}
5659

@@ -199,12 +202,11 @@ public void prepareRecords(Snapshot snapshot) throws PreparationException {
199202
PrepareMutationComposer composer =
200203
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
201204
snapshot.to(composer);
202-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
205+
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
203206

204-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
205-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
206-
for (PartitionedMutations.Key key : orderedKeys) {
207-
tasks.add(() -> storage.mutate(mutations.get(key)));
207+
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
208+
for (List<Mutation> mutations : groupedMutations) {
209+
tasks.add(() -> storage.mutate(mutations));
208210
}
209211
parallelExecutor.prepareRecords(tasks, snapshot.getId());
210212
} catch (NoMutationException e) {
@@ -252,12 +254,11 @@ public void commitRecords(Snapshot snapshot) {
252254
CommitMutationComposer composer =
253255
new CommitMutationComposer(snapshot.getId(), tableMetadataManager);
254256
snapshot.to(composer);
255-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
257+
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
256258

257-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
258-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
259-
for (PartitionedMutations.Key key : orderedKeys) {
260-
tasks.add(() -> storage.mutate(mutations.get(key)));
259+
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
260+
for (List<Mutation> mutations : groupedMutations) {
261+
tasks.add(() -> storage.mutate(mutations));
261262
}
262263
parallelExecutor.commitRecords(tasks, snapshot.getId());
263264
} catch (Exception e) {
@@ -300,12 +301,11 @@ public void rollbackRecords(Snapshot snapshot) {
300301
RollbackMutationComposer composer =
301302
new RollbackMutationComposer(snapshot.getId(), storage, tableMetadataManager);
302303
snapshot.to(composer);
303-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
304+
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
304305

305-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
306-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
307-
for (PartitionedMutations.Key key : orderedKeys) {
308-
tasks.add(() -> storage.mutate(mutations.get(key)));
306+
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
307+
for (List<Mutation> mutations : groupedMutations) {
308+
tasks.add(() -> storage.mutate(mutations));
309309
}
310310
parallelExecutor.rollbackRecords(tasks, snapshot.getId());
311311
} catch (Exception e) {

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ public CommitHandlerWithGroupCommit(
2929
Coordinator coordinator,
3030
TransactionTableMetadataManager tableMetadataManager,
3131
ParallelExecutor parallelExecutor,
32+
MutationsGrouper mutationsGrouper,
3233
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
3334
CoordinatorGroupCommitter groupCommitter) {
3435
super(
3536
storage,
3637
coordinator,
3738
tableMetadataManager,
3839
parallelExecutor,
40+
mutationsGrouper,
3941
coordinatorWriteOmissionOnReadOnlyEnabled);
40-
4142
checkNotNull(groupCommitter);
4243
// The methods of this emitter will be called via GroupCommitter.ready().
4344
groupCommitter.setEmitter(new Emitter(coordinator));

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.scalar.db.common.AbstractDistributedTransactionManager;
2424
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
2525
import com.scalar.db.common.ReadOnlyDistributedTransaction;
26+
import com.scalar.db.common.StorageInfoProvider;
2627
import com.scalar.db.config.DatabaseConfig;
2728
import com.scalar.db.exception.transaction.CommitConflictException;
2829
import com.scalar.db.exception.transaction.CrudConflictException;
@@ -133,12 +134,14 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
133134

134135
// `groupCommitter` must be set before calling this method.
135136
private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
137+
MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin));
136138
if (isGroupCommitEnabled()) {
137139
return new CommitHandlerWithGroupCommit(
138140
storage,
139141
coordinator,
140142
tableMetadataManager,
141143
parallelExecutor,
144+
mutationsGrouper,
142145
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
143146
groupCommitter);
144147
} else {
@@ -147,6 +150,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
147150
coordinator,
148151
tableMetadataManager,
149152
parallelExecutor,
153+
mutationsGrouper,
150154
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
151155
}
152156
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package com.scalar.db.transaction.consensuscommit;
2+
3+
import com.scalar.db.api.Mutation;
4+
import com.scalar.db.api.StorageInfo;
5+
import com.scalar.db.common.StorageInfoProvider;
6+
import com.scalar.db.exception.storage.ExecutionException;
7+
import com.scalar.db.io.Key;
8+
import java.util.ArrayList;
9+
import java.util.Collection;
10+
import java.util.LinkedHashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.stream.Collectors;
15+
import javax.annotation.Nullable;
16+
import javax.annotation.concurrent.ThreadSafe;
17+
18+
@ThreadSafe
19+
public class MutationsGrouper {
20+
21+
private final StorageInfoProvider storageInfoProvider;
22+
23+
public MutationsGrouper(StorageInfoProvider storageInfoProvider) {
24+
this.storageInfoProvider = storageInfoProvider;
25+
}
26+
27+
public List<List<Mutation>> groupMutations(Collection<Mutation> mutations)
28+
throws ExecutionException {
29+
// MutationGroup mutations by their storage info and atomicity unit
30+
List<MutationGroup> groups = new ArrayList<>();
31+
Map<MutationGroup, List<List<Mutation>>> groupToBatches = new LinkedHashMap<>();
32+
33+
for (Mutation mutation : mutations) {
34+
assert mutation.forNamespace().isPresent();
35+
StorageInfo storageInfo = storageInfoProvider.getStorageInfo(mutation.forNamespace().get());
36+
37+
MutationGroup matchedGroup = null;
38+
for (MutationGroup group : groups) {
39+
if (group.isSameGroup(mutation, storageInfo)) {
40+
matchedGroup = group;
41+
break;
42+
}
43+
}
44+
if (matchedGroup == null) {
45+
// If no matching group is found, create a new one
46+
matchedGroup = new MutationGroup(mutation, storageInfo);
47+
groups.add(matchedGroup);
48+
}
49+
50+
List<List<Mutation>> batches =
51+
groupToBatches.computeIfAbsent(matchedGroup, g -> new ArrayList<>());
52+
int maxCount = matchedGroup.storageInfo.getMaxAtomicMutationsCount();
53+
54+
if (batches.isEmpty() || batches.get(batches.size() - 1).size() >= maxCount) {
55+
// If the last batch is full or there are no batches yet, create a new batch
56+
batches.add(new ArrayList<>());
57+
}
58+
59+
batches.get(batches.size() - 1).add(mutation);
60+
}
61+
62+
// Flatten the grouped mutations into a single list of batches
63+
return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList());
64+
}
65+
66+
private static class MutationGroup {
67+
public final StorageInfo storageInfo;
68+
@Nullable public final String namespace;
69+
@Nullable public final String table;
70+
@Nullable public final Key partitionKey;
71+
@Nullable public final Optional<Key> clusteringKey;
72+
73+
public MutationGroup(Mutation mutation, StorageInfo storageInfo) {
74+
assert mutation.forNamespace().isPresent() && mutation.forTable().isPresent();
75+
76+
switch (storageInfo.getMutationAtomicityUnit()) {
77+
case RECORD:
78+
this.clusteringKey = mutation.getClusteringKey();
79+
this.partitionKey = mutation.getPartitionKey();
80+
this.table = mutation.forTable().get();
81+
this.namespace = mutation.forNamespace().get();
82+
this.storageInfo = storageInfo;
83+
break;
84+
case PARTITION:
85+
this.clusteringKey = null;
86+
this.partitionKey = mutation.getPartitionKey();
87+
this.table = mutation.forTable().get();
88+
this.namespace = mutation.forNamespace().get();
89+
this.storageInfo = storageInfo;
90+
break;
91+
case TABLE:
92+
this.clusteringKey = null;
93+
this.partitionKey = null;
94+
this.table = mutation.forTable().get();
95+
this.namespace = mutation.forNamespace().get();
96+
this.storageInfo = storageInfo;
97+
break;
98+
case NAMESPACE:
99+
this.clusteringKey = null;
100+
this.partitionKey = null;
101+
this.table = null;
102+
this.namespace = mutation.forNamespace().get();
103+
this.storageInfo = storageInfo;
104+
break;
105+
case STORAGE:
106+
this.clusteringKey = null;
107+
this.partitionKey = null;
108+
this.table = null;
109+
this.namespace = null;
110+
this.storageInfo = storageInfo;
111+
break;
112+
default:
113+
throw new AssertionError(
114+
"Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit());
115+
}
116+
}
117+
118+
boolean isSameGroup(Mutation otherMutation, StorageInfo otherStorageInfo) {
119+
assert otherMutation.forNamespace().isPresent() && otherMutation.forTable().isPresent();
120+
121+
switch (storageInfo.getMutationAtomicityUnit()) {
122+
case RECORD:
123+
if (!otherMutation.getClusteringKey().equals(this.clusteringKey)) {
124+
return false;
125+
}
126+
// Fall through
127+
case PARTITION:
128+
if (!otherMutation.getPartitionKey().equals(this.partitionKey)) {
129+
return false;
130+
}
131+
// Fall through
132+
case TABLE:
133+
if (!otherMutation.forTable().get().equals(this.table)) {
134+
return false;
135+
}
136+
// Fall through
137+
case NAMESPACE:
138+
if (!otherMutation.forNamespace().get().equals(this.namespace)) {
139+
return false;
140+
}
141+
// Fall through
142+
case STORAGE:
143+
if (!otherStorageInfo.getStorageName().equals(this.storageInfo.getStorageName())) {
144+
return false;
145+
}
146+
break;
147+
default:
148+
throw new AssertionError(
149+
"Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit());
150+
}
151+
152+
return true;
153+
}
154+
}
155+
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/PartitionedMutations.java

Lines changed: 0 additions & 97 deletions
This file was deleted.

0 commit comments

Comments
 (0)