diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index b8a35e9157..dff010e863 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -2,9 +2,9 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.concurrent.LazyInit; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.Mutation; import com.scalar.db.api.TransactionState; import com.scalar.db.common.CoreError; import com.scalar.db.exception.storage.ExecutionException; @@ -36,6 +36,7 @@ public class CommitHandler { protected final Coordinator coordinator; private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; + private final MutationsGrouper mutationsGrouper; protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled; @LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook; @@ -46,11 +47,13 @@ public CommitHandler( Coordinator coordinator, TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, + MutationsGrouper mutationsGrouper, boolean coordinatorWriteOmissionOnReadOnlyEnabled) { this.storage = checkNotNull(storage); this.coordinator = checkNotNull(coordinator); this.tableMetadataManager = checkNotNull(tableMetadataManager); this.parallelExecutor = checkNotNull(parallelExecutor); + this.mutationsGrouper = checkNotNull(mutationsGrouper); this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled; } @@ -199,12 +202,11 @@ public void prepareRecords(Snapshot snapshot) throws PreparationException { PrepareMutationComposer composer = new PrepareMutationComposer(snapshot.getId(), tableMetadataManager); snapshot.to(composer); - PartitionedMutations mutations = new PartitionedMutations(composer.get()); + List> groupedMutations = mutationsGrouper.groupMutations(composer.get()); - ImmutableList orderedKeys = mutations.getOrderedKeys(); - List tasks = new ArrayList<>(orderedKeys.size()); - for (PartitionedMutations.Key key : orderedKeys) { - tasks.add(() -> storage.mutate(mutations.get(key))); + List tasks = new ArrayList<>(groupedMutations.size()); + for (List mutations : groupedMutations) { + tasks.add(() -> storage.mutate(mutations)); } parallelExecutor.prepareRecords(tasks, snapshot.getId()); } catch (NoMutationException e) { @@ -252,12 +254,11 @@ public void commitRecords(Snapshot snapshot) { CommitMutationComposer composer = new CommitMutationComposer(snapshot.getId(), tableMetadataManager); snapshot.to(composer); - PartitionedMutations mutations = new PartitionedMutations(composer.get()); + List> groupedMutations = mutationsGrouper.groupMutations(composer.get()); - ImmutableList orderedKeys = mutations.getOrderedKeys(); - List tasks = new ArrayList<>(orderedKeys.size()); - for (PartitionedMutations.Key key : orderedKeys) { - tasks.add(() -> storage.mutate(mutations.get(key))); + List tasks = new ArrayList<>(groupedMutations.size()); + for (List mutations : groupedMutations) { + tasks.add(() -> storage.mutate(mutations)); } parallelExecutor.commitRecords(tasks, snapshot.getId()); } catch (Exception e) { @@ -300,12 +301,11 @@ public void rollbackRecords(Snapshot snapshot) { RollbackMutationComposer composer = new RollbackMutationComposer(snapshot.getId(), storage, tableMetadataManager); snapshot.to(composer); - PartitionedMutations mutations = new PartitionedMutations(composer.get()); + List> groupedMutations = mutationsGrouper.groupMutations(composer.get()); - ImmutableList orderedKeys = mutations.getOrderedKeys(); - List tasks = new ArrayList<>(orderedKeys.size()); - for (PartitionedMutations.Key key : orderedKeys) { - tasks.add(() -> storage.mutate(mutations.get(key))); + List tasks = new ArrayList<>(groupedMutations.size()); + for (List mutations : groupedMutations) { + tasks.add(() -> storage.mutate(mutations)); } parallelExecutor.rollbackRecords(tasks, snapshot.getId()); } catch (Exception e) { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index ca7fcdb936..09712bf2b1 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -29,6 +29,7 @@ public CommitHandlerWithGroupCommit( Coordinator coordinator, TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, + MutationsGrouper mutationsGrouper, boolean coordinatorWriteOmissionOnReadOnlyEnabled, CoordinatorGroupCommitter groupCommitter) { super( @@ -36,8 +37,8 @@ public CommitHandlerWithGroupCommit( coordinator, tableMetadataManager, parallelExecutor, + mutationsGrouper, coordinatorWriteOmissionOnReadOnlyEnabled); - checkNotNull(groupCommitter); // The methods of this emitter will be called via GroupCommitter.ready(). groupCommitter.setEmitter(new Emitter(coordinator)); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index fe08f2322b..d3020dab4a 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -23,6 +23,7 @@ import com.scalar.db.common.AbstractDistributedTransactionManager; import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; import com.scalar.db.common.ReadOnlyDistributedTransaction; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CrudConflictException; @@ -133,12 +134,14 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) { // `groupCommitter` must be set before calling this method. private CommitHandler createCommitHandler(ConsensusCommitConfig config) { + MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin)); if (isGroupCommitEnabled()) { return new CommitHandlerWithGroupCommit( storage, coordinator, tableMetadataManager, parallelExecutor, + mutationsGrouper, config.isCoordinatorWriteOmissionOnReadOnlyEnabled(), groupCommitter); } else { @@ -147,6 +150,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) { coordinator, tableMetadataManager, parallelExecutor, + mutationsGrouper, config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java new file mode 100644 index 0000000000..1412cefbc2 --- /dev/null +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java @@ -0,0 +1,127 @@ +package com.scalar.db.transaction.consensuscommit; + +import com.scalar.db.api.Mutation; +import com.scalar.db.api.StorageInfo; +import com.scalar.db.common.StorageInfoProvider; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.Key; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe +public class MutationsGrouper { + + private final StorageInfoProvider storageInfoProvider; + + public MutationsGrouper(StorageInfoProvider storageInfoProvider) { + this.storageInfoProvider = storageInfoProvider; + } + + public List> groupMutations(Collection mutations) + throws ExecutionException { + // MutationGroup mutations by their storage info and atomicity unit + Map>> groupToBatches = new LinkedHashMap<>(); + + for (Mutation mutation : mutations) { + assert mutation.forNamespace().isPresent(); + StorageInfo storageInfo = storageInfoProvider.getStorageInfo(mutation.forNamespace().get()); + + MutationGroup group = new MutationGroup(mutation, storageInfo); + List> batches = groupToBatches.computeIfAbsent(group, g -> new ArrayList<>()); + int maxCount = group.storageInfo.getMaxAtomicMutationsCount(); + + if (batches.isEmpty() || batches.get(batches.size() - 1).size() >= maxCount) { + // If the last batch is full or there are no batches yet, create a new batch + batches.add(new ArrayList<>()); + } + + batches.get(batches.size() - 1).add(mutation); + } + + // Flatten the grouped mutations into a single list of batches + return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList()); + } + + private static class MutationGroup { + public final StorageInfo storageInfo; + @Nullable public final String namespace; + @Nullable public final String table; + @Nullable public final Key partitionKey; + @Nullable public final Optional clusteringKey; + + private MutationGroup(Mutation mutation, StorageInfo storageInfo) { + assert mutation.forNamespace().isPresent() && mutation.forTable().isPresent(); + + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + this.clusteringKey = mutation.getClusteringKey(); + this.partitionKey = mutation.getPartitionKey(); + this.table = mutation.forTable().get(); + this.namespace = mutation.forNamespace().get(); + this.storageInfo = storageInfo; + break; + case PARTITION: + this.clusteringKey = null; + this.partitionKey = mutation.getPartitionKey(); + this.table = mutation.forTable().get(); + this.namespace = mutation.forNamespace().get(); + this.storageInfo = storageInfo; + break; + case TABLE: + this.clusteringKey = null; + this.partitionKey = null; + this.table = mutation.forTable().get(); + this.namespace = mutation.forNamespace().get(); + this.storageInfo = storageInfo; + break; + case NAMESPACE: + this.clusteringKey = null; + this.partitionKey = null; + this.table = null; + this.namespace = mutation.forNamespace().get(); + this.storageInfo = storageInfo; + break; + case STORAGE: + this.clusteringKey = null; + this.partitionKey = null; + this.table = null; + this.namespace = null; + this.storageInfo = storageInfo; + break; + default: + throw new AssertionError( + "Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit()); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MutationGroup)) { + return false; + } + MutationGroup that = (MutationGroup) o; + return Objects.equals(storageInfo.getStorageName(), that.storageInfo.getStorageName()) + && Objects.equals(namespace, that.namespace) + && Objects.equals(table, that.table) + && Objects.equals(partitionKey, that.partitionKey) + && Objects.equals(clusteringKey, that.clusteringKey); + } + + @Override + public int hashCode() { + return Objects.hash( + storageInfo.getStorageName(), namespace, table, partitionKey, clusteringKey); + } + } +} diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/PartitionedMutations.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/PartitionedMutations.java deleted file mode 100644 index 6a2622c248..0000000000 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/PartitionedMutations.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.scalar.db.transaction.consensuscommit; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; -import com.scalar.db.api.Mutation; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import javax.annotation.Nonnull; -import javax.annotation.concurrent.Immutable; - -@Immutable -public class PartitionedMutations { - private final ImmutableListMultimap partitions; - - @SafeVarargs - public PartitionedMutations(Collection... collections) { - ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder(); - for (Collection collection : collections) { - collection.forEach(m -> builder.put(new Key(m), m)); - } - partitions = builder.build(); - } - - @Nonnull - public ImmutableList getOrderedKeys() { - List keys = new ArrayList<>(partitions.keySet()); - Collections.sort(keys); - return ImmutableList.copyOf(keys); - } - - @Nonnull - public ImmutableList get(Key key) { - return partitions.get(key); - } - - @Immutable - public static final class Key implements Comparable { - private final String namespace; - private final String table; - private final com.scalar.db.io.Key partitionKey; - - public Key(Mutation mutation) { - namespace = mutation.forNamespace().get(); - table = mutation.forTable().get(); - partitionKey = mutation.getPartitionKey(); - } - - @Override - public int hashCode() { - return Objects.hash(namespace, table, partitionKey); - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof Key)) { - return false; - } - Key another = (Key) o; - return this.namespace.equals(another.namespace) - && this.table.equals(another.table) - && this.partitionKey.equals(another.partitionKey); - } - - @Override - public int compareTo(Key o) { - return ComparisonChain.start() - .compare(this.namespace, o.namespace) - .compare(this.table, o.table) - .compare(this.partitionKey, o.partitionKey) - .result(); - } - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof PartitionedMutations)) { - return false; - } - PartitionedMutations other = (PartitionedMutations) o; - return partitions.equals(other.partitions); - } - - @Override - public int hashCode() { - return Objects.hash(partitions); - } -} diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 4a0ae7cddf..3b779385d7 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -21,6 +21,7 @@ import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; import com.scalar.db.common.AbstractTwoPhaseCommitTransactionManager; import com.scalar.db.common.CoreError; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CrudConflictException; @@ -81,6 +82,7 @@ public TwoPhaseConsensusCommitManager( coordinator, tableMetadataManager, parallelExecutor, + new MutationsGrouper(new StorageInfoProvider(admin)), config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); @@ -105,6 +107,7 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) { coordinator, tableMetadataManager, parallelExecutor, + new MutationsGrouper(new StorageInfoProvider(admin)), config.isCoordinatorWriteOmissionOnReadOnlyEnabled()); isIncludeMetadataEnabled = config.isIncludeMetadataEnabled(); mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index 51df00f89e..cf5e32260e 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -12,12 +12,16 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.util.concurrent.Uninterruptibles; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Get; import com.scalar.db.api.Put; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TransactionState; +import com.scalar.db.common.StorageInfoImpl; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.storage.NoMutationException; import com.scalar.db.exception.storage.RetriableExecutionException; @@ -34,13 +38,11 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.MockitoAnnotations; -@ExtendWith(MockitoExtension.class) public class CommitHandlerTest { private static final String ANY_NAMESPACE_NAME = "namespace"; private static final String ANY_TABLE_NAME = "table"; @@ -58,6 +60,7 @@ public class CommitHandlerTest { @Mock protected DistributedStorage storage; @Mock protected Coordinator coordinator; @Mock protected TransactionTableMetadataManager tableMetadataManager; + @Mock protected StorageInfoProvider storageInfoProvider; @Mock protected ConsensusCommitConfig config; @Mock protected BeforePreparationSnapshotHook beforePreparationSnapshotHook; @Mock protected Future beforePreparationSnapshotHookFuture; @@ -79,15 +82,24 @@ protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnRe coordinator, tableMetadataManager, parallelExecutor, + new MutationsGrouper(storageInfoProvider), coordinatorWriteOmissionOnReadOnlyEnabled); } @BeforeEach void setUp() throws Exception { + MockitoAnnotations.openMocks(this).close(); + parallelExecutor = new ParallelExecutor(config); handler = spy(createCommitHandler(true)); extraInitialize(); + + // Arrange + when(storageInfoProvider.getStorageInfo(ANY_NAMESPACE_NAME)) + .thenReturn( + new StorageInfoImpl( + "storage1", StorageInfo.MutationAtomicityUnit.PARTITION, Integer.MAX_VALUE)); } @AfterEach diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java index fe79a9db69..6163b0bf9a 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java @@ -67,6 +67,7 @@ protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnRe coordinator, tableMetadataManager, parallelExecutor, + new MutationsGrouper(storageInfoProvider), coordinatorWriteOmissionOnReadOnlyEnabled, groupCommitter); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java new file mode 100644 index 0000000000..a71db38600 --- /dev/null +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/MutationsGrouperTest.java @@ -0,0 +1,348 @@ +package com.scalar.db.transaction.consensuscommit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.StorageInfo; +import com.scalar.db.common.StorageInfoProvider; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.Key; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class MutationsGrouperTest { + + @Mock private StorageInfoProvider storageInfoProvider; + + private MutationsGrouper grouper; + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this).close(); + + grouper = new MutationsGrouper(storageInfoProvider); + } + + @Test + public void groupMutations_WithEmptyCollection_ShouldReturnEmptyList() throws ExecutionException { + // Act + List> result = grouper.groupMutations(Collections.emptyList()); + + // Assert + assertThat(result).isEmpty(); + } + + @Test + public void groupMutations_WithRecordAtomicity_ShouldGroupCorrectly() throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.RECORD); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Three mutations with different records + Key partitionKey1 = mock(Key.class); + Key clusteringKey1 = mock(Key.class); + Mutation mutation1 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey1)); + + Key partitionKey2 = mock(Key.class); + Key clusteringKey2 = mock(Key.class); + Mutation mutation2 = + createMutation(namespace, table, partitionKey2, Optional.of(clusteringKey2)); + + Key clusteringKey3 = mock(Key.class); + Mutation mutation3 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey3)); + + Mutation mutation4 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey1)); + + Mutation mutation5 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey3)); + + // Act + List> result = + grouper.groupMutations( + Arrays.asList(mutation1, mutation2, mutation3, mutation4, mutation5)); + + // Assert + assertThat(result).hasSize(3); + assertThat(result.get(0)).containsExactly(mutation1, mutation4); + assertThat(result.get(1)).containsExactly(mutation2); + assertThat(result.get(2)).containsExactly(mutation3, mutation5); + } + + @Test + public void groupMutations_WithPartitionAtomicity_ShouldGroupCorrectly() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.PARTITION); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Three mutations with two different partitions + Key partitionKey1 = mock(Key.class); + Key clusteringKey1 = mock(Key.class); + Mutation mutation1 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey1)); + + Key partitionKey2 = mock(Key.class); + Key clusteringKey2 = mock(Key.class); + Mutation mutation2 = + createMutation(namespace, table, partitionKey2, Optional.of(clusteringKey2)); + + Key clusteringKey3 = mock(Key.class); + Mutation mutation3 = + createMutation(namespace, table, partitionKey1, Optional.of(clusteringKey3)); + + // Act + List> result = + grouper.groupMutations(Arrays.asList(mutation1, mutation2, mutation3)); + + // Assert + assertThat(result).hasSize(2); + assertThat(result.get(0)).containsExactly(mutation1, mutation3); + assertThat(result.get(1)).containsExactly(mutation2); + } + + @Test + public void groupMutations_WithTableAtomicity_ShouldGroupCorrectly() throws ExecutionException { + // Arrange + String namespace1 = "ns1"; + String namespace2 = "ns2"; + String table1 = "table1"; + String table2 = "table2"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace1)).thenReturn(storageInfo); + when(storageInfoProvider.getStorageInfo(namespace2)).thenReturn(storageInfo); + + // Three mutations with two different tables + Key partitionKey1 = mock(Key.class); + Mutation mutation1 = createMutation(namespace1, table1, partitionKey1, Optional.empty()); + + Key partitionKey2 = mock(Key.class); + Mutation mutation2 = createMutation(namespace1, table2, partitionKey2, Optional.empty()); + + Key partitionKey3 = mock(Key.class); + Mutation mutation3 = createMutation(namespace1, table1, partitionKey3, Optional.empty()); + + Mutation mutation4 = createMutation(namespace2, table1, partitionKey1, Optional.empty()); + + // Act + List> result = + grouper.groupMutations(Arrays.asList(mutation1, mutation2, mutation3, mutation4)); + + // Assert + assertThat(result).hasSize(3); + assertThat(result.get(0)).containsExactly(mutation1, mutation3); + assertThat(result.get(1)).containsExactly(mutation2); + assertThat(result.get(2)).containsExactly(mutation4); + } + + @Test + public void groupMutations_WithBatchSizeLimit_ShouldCreateMultipleBatches() + throws ExecutionException { + // Arrange + String namespace = "ns"; + String table = "table"; + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.TABLE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(2); // Max 2 mutations per batch + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace)).thenReturn(storageInfo); + + // Create 5 mutations for the same table + List mutations = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Key partitionKey = mock(Key.class); + mutations.add(createMutation(namespace, table, partitionKey, Optional.empty())); + } + + // Act + List> result = grouper.groupMutations(mutations); + + // Assert + assertThat(result).hasSize(3); // Should create 3 batches: 2+2+1 + assertThat(result.get(0).size()).isEqualTo(2); + assertThat(result.get(1).size()).isEqualTo(2); + assertThat(result.get(2).size()).isEqualTo(1); + } + + @Test + public void groupMutations_WithNamespaceAtomicity_ShouldGroupCorrectly() + throws ExecutionException { + // Arrange + String namespace1 = "ns1"; + String namespace2 = "ns2"; + String table1 = "table1"; + String table2 = "table2"; + + StorageInfo storageInfo = mock(StorageInfo.class); + when(storageInfo.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.NAMESPACE); + when(storageInfo.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo.getStorageName()).thenReturn("storage1"); + when(storageInfoProvider.getStorageInfo(namespace1)).thenReturn(storageInfo); + when(storageInfoProvider.getStorageInfo(namespace2)).thenReturn(storageInfo); + + // Create mutations for different namespaces + Key partitionKey1 = mock(Key.class); + Mutation mutation1 = createMutation(namespace1, table1, partitionKey1, Optional.empty()); + + Key partitionKey2 = mock(Key.class); + Mutation mutation2 = createMutation(namespace2, table2, partitionKey2, Optional.empty()); + + Key partitionKey3 = mock(Key.class); + Mutation mutation3 = createMutation(namespace1, table2, partitionKey3, Optional.empty()); + + // Act + List> result = + grouper.groupMutations(Arrays.asList(mutation1, mutation2, mutation3)); + + // Assert + assertThat(result).hasSize(2); + assertThat(result.get(0)).containsExactly(mutation1, mutation3); + assertThat(result.get(1)).containsExactly(mutation2); + } + + @Test + public void groupMutations_WithStorageAtomicity_ShouldGroupCorrectly() throws ExecutionException { + // Arrange + String namespace1 = "ns1"; + String namespace2 = "ns2"; + String namespace3 = "ns3"; + String namespace4 = "ns4"; + String table1 = "table1"; + String table2 = "table2"; + + StorageInfo storageInfo1 = mock(StorageInfo.class); + when(storageInfo1.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.STORAGE); + when(storageInfo1.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo1.getStorageName()).thenReturn("storage1"); + + StorageInfo storageInfo2 = mock(StorageInfo.class); + when(storageInfo2.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.STORAGE); + when(storageInfo2.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo2.getStorageName()).thenReturn("storage2"); + + when(storageInfoProvider.getStorageInfo(namespace1)).thenReturn(storageInfo1); + when(storageInfoProvider.getStorageInfo(namespace2)).thenReturn(storageInfo2); + when(storageInfoProvider.getStorageInfo(namespace3)).thenReturn(storageInfo2); + when(storageInfoProvider.getStorageInfo(namespace4)).thenReturn(storageInfo1); + + // Create mutations for different storages + Key partitionKey1 = mock(Key.class); + Mutation mutation1 = createMutation(namespace1, table1, partitionKey1, Optional.empty()); + + Key partitionKey2 = mock(Key.class); + Mutation mutation2 = createMutation(namespace2, table2, partitionKey2, Optional.empty()); + + Key partitionKey3 = mock(Key.class); + Mutation mutation3 = createMutation(namespace1, table2, partitionKey3, Optional.empty()); + + Mutation mutation4 = createMutation(namespace3, table1, partitionKey1, Optional.empty()); + + Mutation mutation5 = createMutation(namespace4, table2, partitionKey2, Optional.empty()); + + // Act + List> result = + grouper.groupMutations( + Arrays.asList(mutation1, mutation2, mutation3, mutation4, mutation5)); + + // Assert + assertThat(result).hasSize(2); + assertThat(result.get(0)).containsExactly(mutation1, mutation3, mutation5); + assertThat(result.get(1)).containsExactly(mutation2, mutation4); + } + + @Test + public void groupMutations_WithStorageAtomicityAndBatchSizeLimit_ShouldGroupCorrectly() + throws ExecutionException { + // Arrange + String namespace1 = "ns1"; + String namespace2 = "ns2"; + String namespace3 = "ns3"; + String namespace4 = "ns4"; + String table1 = "table1"; + String table2 = "table2"; + + StorageInfo storageInfo1 = mock(StorageInfo.class); + when(storageInfo1.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.STORAGE); + when(storageInfo1.getMaxAtomicMutationsCount()).thenReturn(2); // Max 2 mutations per batch + when(storageInfo1.getStorageName()).thenReturn("storage1"); + + StorageInfo storageInfo2 = mock(StorageInfo.class); + when(storageInfo2.getMutationAtomicityUnit()) + .thenReturn(StorageInfo.MutationAtomicityUnit.STORAGE); + when(storageInfo2.getMaxAtomicMutationsCount()).thenReturn(100); + when(storageInfo2.getStorageName()).thenReturn("storage2"); + + when(storageInfoProvider.getStorageInfo(namespace1)).thenReturn(storageInfo1); + when(storageInfoProvider.getStorageInfo(namespace2)).thenReturn(storageInfo2); + when(storageInfoProvider.getStorageInfo(namespace3)).thenReturn(storageInfo2); + when(storageInfoProvider.getStorageInfo(namespace4)).thenReturn(storageInfo1); + + // Create mutations for different storages + Key partitionKey1 = mock(Key.class); + Mutation mutation1 = createMutation(namespace1, table1, partitionKey1, Optional.empty()); + + Key partitionKey2 = mock(Key.class); + Mutation mutation2 = createMutation(namespace2, table2, partitionKey2, Optional.empty()); + + Key partitionKey3 = mock(Key.class); + Mutation mutation3 = createMutation(namespace1, table2, partitionKey3, Optional.empty()); + + Mutation mutation4 = createMutation(namespace3, table1, partitionKey1, Optional.empty()); + + Mutation mutation5 = createMutation(namespace4, table2, partitionKey2, Optional.empty()); + + // Act + List> result = + grouper.groupMutations( + Arrays.asList(mutation1, mutation2, mutation3, mutation4, mutation5)); + + // Assert + assertThat(result).hasSize(3); + assertThat(result.get(0)).containsExactly(mutation1, mutation3); + assertThat(result.get(1)).containsExactly(mutation5); + assertThat(result.get(2)).containsExactly(mutation2, mutation4); + } + + private Mutation createMutation( + String namespace, String table, Key partitionKey, Optional clusteringKey) { + Mutation mutation = mock(Put.class); + when(mutation.forNamespace()).thenReturn(Optional.of(namespace)); + when(mutation.forTable()).thenReturn(Optional.of(table)); + when(mutation.getPartitionKey()).thenReturn(partitionKey); + when(mutation.getClusteringKey()).thenReturn(clusteringKey); + return mutation; + } +} diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/PartitionedMutationsKeyTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/PartitionedMutationsKeyTest.java deleted file mode 100644 index 679ba00616..0000000000 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/PartitionedMutationsKeyTest.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.scalar.db.transaction.consensuscommit; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.scalar.db.api.Put; -import com.scalar.db.io.Key; -import org.junit.jupiter.api.Test; - -public class PartitionedMutationsKeyTest { - private static final String ANY_NAMESPACE_NAME = "namespace"; - private static final String ANY_TABLE_NAME = "table"; - private static final String ANY_NAME_1 = "name1"; - private static final String ANY_TEXT_1 = "text1"; - private static final String ANY_TEXT_3 = "text3"; - - private Put preparePut() { - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - return new Put(partitionKey, null).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); - } - - private Put prepareAnotherPut() { - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_3); - return new Put(partitionKey, null).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); - } - - @Test - public void equals_SameOperationGivenInConstructor_ShouldReturnTrue() { - // Arrange - Put put = preparePut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(put); - - // Act - boolean res = key.equals(new PartitionedMutations.Key(put)); - - // Assert - assertThat(res).isTrue(); - } - - @Test - public void equals_EquivalentOperationGivenInConstructor_ShouldReturnTrue() { - // Arrange - Put one = preparePut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(one); - Put another = preparePut(); - - // Act - boolean res = key.equals(new PartitionedMutations.Key(another)); - - // Assert - assertThat(res).isTrue(); - } - - @Test - public void equals_NonEquivalentOperationGivenInConstructor_ShouldReturnFalse() { - // Arrange - Put one = preparePut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(one); - Put another = prepareAnotherPut(); - - // Act - boolean res = key.equals(new PartitionedMutations.Key(another)); - - // Assert - assertThat(res).isFalse(); - } - - @Test - public void compareTo_SameOperationGivenInConstructor_ShouldReturnZero() { - // Arrange - Put put = preparePut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(put); - - // Act - int res = key.compareTo(new PartitionedMutations.Key(put)); - - // Assert - assertThat(res).isEqualTo(0); - } - - @Test - public void compareTo_EquivalentOperationGivenInConstructor_ShouldReturnZero() { - // Arrange - Put one = preparePut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(one); - Put another = preparePut(); - - // Act - int res = key.compareTo(new PartitionedMutations.Key(another)); - - // Assert - assertThat(res).isEqualTo(0); - } - - @Test - public void compareTo_BiggerOperationGivenInConstructor_ShouldReturnNegative() { - // Arrange - Put one = preparePut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(one); - Put another = prepareAnotherPut(); - - // Act - int res = key.compareTo(new PartitionedMutations.Key(another)); - - // Assert - assertThat(res).isLessThan(0); - } - - @Test - public void compareTo_LesserOperationGivenInConstructor_ShouldReturnZero() { - // Arrange - Put one = prepareAnotherPut(); - PartitionedMutations.Key key = new PartitionedMutations.Key(one); - Put another = preparePut(); - - // Act - int res = key.compareTo(new PartitionedMutations.Key(another)); - - // Assert - assertThat(res).isGreaterThan(0); - } -} diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/PartitionedMutationsTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/PartitionedMutationsTest.java deleted file mode 100644 index a451ca5aca..0000000000 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/PartitionedMutationsTest.java +++ /dev/null @@ -1,103 +0,0 @@ -package com.scalar.db.transaction.consensuscommit; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.scalar.db.api.Mutation; -import com.scalar.db.api.Put; -import com.scalar.db.io.Key; -import java.util.Arrays; -import java.util.List; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.Test; - -public class PartitionedMutationsTest { - private static final String ANY_NAMESPACE_NAME = "namespace"; - private static final String ANY_TABLE_NAME = "table"; - private static final String ANY_NAME_1 = "name1"; - private static final String ANY_NAME_2 = "name2"; - private static final String ANY_NAME_3 = "name3"; - private static final String ANY_NAME_4 = "name4"; - private static final String ANY_NAME_5 = "name5"; - private static final String ANY_NAME_6 = "name6"; - private static final String ANY_TEXT_1 = "text1"; - private static final String ANY_TEXT_2 = "text2"; - private static final String ANY_TEXT_3 = "text3"; - private static final String ANY_TEXT_4 = "text4"; - private static final String ANY_TEXT_5 = "text5"; - private static final String ANY_TEXT_6 = "text6"; - - private Put preparePut(Key partitionKey, Key clusteringKey) { - return new Put(partitionKey, clusteringKey) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME); - } - - private List preparePuts1() { - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - Key clusteringKey1 = new Key(ANY_NAME_2, ANY_TEXT_2); - Key clusteringKey2 = new Key(ANY_NAME_3, ANY_TEXT_3); - Put put1 = preparePut(partitionKey, clusteringKey1); - Put put2 = preparePut(partitionKey, clusteringKey2); - return Arrays.asList(put1, put2); - } - - private List preparePuts2() { - Key partitionKey = new Key(ANY_NAME_4, ANY_TEXT_4); - Key clusteringKey1 = new Key(ANY_NAME_5, ANY_TEXT_5); - Key clusteringKey2 = new Key(ANY_NAME_6, ANY_TEXT_6); - Put put1 = preparePut(partitionKey, clusteringKey1); - Put put2 = preparePut(partitionKey, clusteringKey2); - return Arrays.asList(put1, put2); - } - - @Test - public void - get_MutationCollectionsAndPartitionGiven_ShouldReturnMutationsForSpecifiedPartition() { - // Arrange - List puts = preparePuts1(); - PartitionedMutations partitionedMutations = new PartitionedMutations(puts); - - // Act - PartitionedMutations.Key key = new PartitionedMutations.Key(puts.get(0)); - List mutations = partitionedMutations.get(key); - - // Assert - assertThat(mutations.size()).isEqualTo(puts.size()); - assertThat(mutations.contains(puts.get(0))).isTrue(); - assertThat(mutations.contains(puts.get(1))).isTrue(); - } - - @Test - public void getOrderedKeys_MutationCollectionsGiven_ShouldReturnOrderedKeys() { - // Arrange - List puts1 = preparePuts1(); - List puts2 = preparePuts2(); - PartitionedMutations partitionedMutations = new PartitionedMutations(puts1, puts2); - - // Act - List keys = partitionedMutations.getOrderedKeys(); - - // Assert - assertThat(keys.size()).isEqualTo(2); - assertThat(keys.get(0)).isEqualTo(new PartitionedMutations.Key(puts1.get(0))); - assertThat(keys.get(1)).isEqualTo(new PartitionedMutations.Key(puts2.get(0))); - } - - @Test - public void - getOrderedKeys_MutationCollectionsGivenInDifferentOrder_ShouldReturnEquallyOrderedKeys() { - // Arrange - List puts1 = preparePuts1(); - List puts2 = preparePuts2(); - PartitionedMutations partitionedMutations1 = new PartitionedMutations(puts1, puts2); - PartitionedMutations partitionedMutations2 = new PartitionedMutations(puts2, puts1); - - // Act - List keys1 = partitionedMutations1.getOrderedKeys(); - List keys2 = partitionedMutations2.getOrderedKeys(); - - // Assert - Assertions.assertThat(keys1).isEqualTo(keys2); - assertThat(partitionedMutations1).isEqualTo(partitionedMutations2); - } -} diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index a62c047733..f4396d45e8 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -20,6 +20,7 @@ import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionState; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.TransactionException; @@ -158,11 +159,19 @@ public void setUp() throws Exception { private CommitHandler createCommitHandler( TransactionTableMetadataManager tableMetadataManager, @Nullable CoordinatorGroupCommitter groupCommitter) { + MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin)); if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, true, groupCommitter); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + mutationsGrouper, + true, + groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor, true); + return new CommitHandler( + storage, coordinator, tableMetadataManager, parallelExecutor, mutationsGrouper, true); } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index de122da131..6feec004c8 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -32,6 +32,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanAll; import com.scalar.db.api.Selection; +import com.scalar.db.api.StorageInfo; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionManagerCrudOperable; @@ -39,6 +40,7 @@ import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.DecoratedDistributedTransaction; +import com.scalar.db.common.StorageInfoProvider; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CommitConflictException; @@ -7183,8 +7185,10 @@ void manager_mutate_ShouldMutateRecords(Isolation isolation) throws TransactionE @ParameterizedTest @EnumSource(Isolation.class) - public void putAndCommit_SinglePartitionMutationsGiven_ShouldAccessStorageOnceForPrepareAndCommit( - Isolation isolation) throws TransactionException, ExecutionException, CoordinatorException { + public void + putAndCommit_SinglePartitionMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( + Isolation isolation) + throws TransactionException, ExecutionException, CoordinatorException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(isolation); IntValue balance = new IntValue(BALANCE, INITIAL_BALANCE); @@ -7199,8 +7203,23 @@ public void putAndCommit_SinglePartitionMutationsGiven_ShouldAccessStorageOnceFo transaction.commit(); // Assert - // one for prepare, one for commit - verify(storage, times(2)).mutate(anyList()); + StorageInfo storageInfo = admin.getStorageInfo(namespace1); + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + // twice for prepare, twice for commit + verify(storage, times(4)).mutate(anyList()); + break; + case PARTITION: + case TABLE: + case NAMESPACE: + case STORAGE: + // one for prepare, one for commit + verify(storage, times(2)).mutate(anyList()); + break; + default: + throw new AssertionError(); + } + if (isGroupCommitEnabled()) { verify(coordinator) .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); @@ -7211,8 +7230,10 @@ public void putAndCommit_SinglePartitionMutationsGiven_ShouldAccessStorageOnceFo @ParameterizedTest @EnumSource(Isolation.class) - public void putAndCommit_TwoPartitionsMutationsGiven_ShouldAccessStorageTwiceForPrepareAndCommit( - Isolation isolation) throws TransactionException, ExecutionException, CoordinatorException { + public void + putAndCommit_TwoPartitionsMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( + Isolation isolation) + throws TransactionException, ExecutionException, CoordinatorException { // Arrange ConsensusCommitManager manager = createConsensusCommitManager(isolation); IntValue balance = new IntValue(BALANCE, INITIAL_BALANCE); @@ -7227,8 +7248,87 @@ public void putAndCommit_TwoPartitionsMutationsGiven_ShouldAccessStorageTwiceFor transaction.commit(); // Assert - // twice for prepare, twice for commit - verify(storage, times(4)).mutate(anyList()); + StorageInfo storageInfo = admin.getStorageInfo(namespace1); + switch (storageInfo.getMutationAtomicityUnit()) { + case RECORD: + case PARTITION: + // twice for prepare, twice for commit + verify(storage, times(4)).mutate(anyList()); + break; + case TABLE: + case NAMESPACE: + case STORAGE: + // one for prepare, one for commit + verify(storage, times(2)).mutate(anyList()); + break; + default: + throw new AssertionError(); + } + + if (isGroupCommitEnabled()) { + verify(coordinator) + .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); + return; + } + verify(coordinator).putState(any(Coordinator.State.class)); + } + + @ParameterizedTest + @EnumSource(Isolation.class) + public void + insertAndCommit_TwoNamespacesMutationsGiven_ShouldBehaveCorrectlyBasedOnStorageMutationAtomicityUnit( + Isolation isolation) + throws TransactionException, ExecutionException, CoordinatorException { + // Arrange + ConsensusCommitManager manager = createConsensusCommitManager(isolation); + DistributedTransaction transaction = manager.begin(); + + // Act + transaction.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.insert( + Insert.newBuilder() + .namespace(namespace2) + .table(TABLE_2) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.commit(); + + // Assert + StorageInfo storageInfo1 = admin.getStorageInfo(namespace1); + StorageInfo storageInfo2 = admin.getStorageInfo(namespace2); + if (!storageInfo1.getStorageName().equals(storageInfo2.getStorageName())) { + // different storages + + // twice for prepare, twice for commit + verify(storage, times(4)).mutate(anyList()); + } else { + // same storage + switch (storageInfo1.getMutationAtomicityUnit()) { + case RECORD: + case PARTITION: + case TABLE: + case NAMESPACE: + // twice for prepare, twice for commit + verify(storage, times(4)).mutate(anyList()); + break; + case STORAGE: + // one for prepare, one for commit + verify(storage, times(2)).mutate(anyList()); + break; + default: + throw new AssertionError(); + } + } + if (isGroupCommitEnabled()) { verify(coordinator) .putStateForGroupCommit(anyString(), anyList(), any(TransactionState.class), anyLong()); @@ -7655,11 +7755,19 @@ private ConsensusCommitManager createConsensusCommitManager(Isolation isolation) private CommitHandler createCommitHandler( TransactionTableMetadataManager tableMetadataManager, @Nullable CoordinatorGroupCommitter groupCommitter) { + MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin)); if (groupCommitter != null) { return new CommitHandlerWithGroupCommit( - storage, coordinator, tableMetadataManager, parallelExecutor, true, groupCommitter); + storage, + coordinator, + tableMetadataManager, + parallelExecutor, + mutationsGrouper, + true, + groupCommitter); } else { - return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor, true); + return new CommitHandler( + storage, coordinator, tableMetadataManager, parallelExecutor, mutationsGrouper, true); } }