diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java index 9268905d53..f220889e3c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java @@ -117,7 +117,7 @@ public void commit(Snapshot snapshot, boolean readOnly) if (hasWritesOrDeletesInSnapshot) { try { - prepare(snapshot); + prepareRecords(snapshot); } catch (PreparationException e) { safelyCallOnFailureBeforeCommit(snapshot); abortState(snapshot.getId()); @@ -134,7 +134,7 @@ public void commit(Snapshot snapshot, boolean readOnly) if (snapshot.hasReads()) { try { - validate(snapshot); + validateRecords(snapshot); } catch (ValidationException e) { safelyCallOnFailureBeforeCommit(snapshot); @@ -194,9 +194,19 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause) } } - public void prepare(Snapshot snapshot) throws PreparationException { + public void prepareRecords(Snapshot snapshot) throws PreparationException { try { - prepareRecords(snapshot); + PrepareMutationComposer composer = + new PrepareMutationComposer(snapshot.getId(), tableMetadataManager); + snapshot.to(composer); + PartitionedMutations mutations = new PartitionedMutations(composer.get()); + + ImmutableList orderedKeys = mutations.getOrderedKeys(); + List tasks = new ArrayList<>(orderedKeys.size()); + for (PartitionedMutations.Key key : orderedKeys) { + tasks.add(() -> storage.mutate(mutations.get(key))); + } + parallelExecutor.prepareRecords(tasks, snapshot.getId()); } catch (NoMutationException e) { throw new PreparationConflictException( CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId()); @@ -211,22 +221,7 @@ public void prepare(Snapshot snapshot) throws PreparationException { } } - private void prepareRecords(Snapshot snapshot) - throws ExecutionException, PreparationConflictException { - PrepareMutationComposer composer = - new PrepareMutationComposer(snapshot.getId(), tableMetadataManager); - snapshot.to(composer); - PartitionedMutations mutations = new PartitionedMutations(composer.get()); - - ImmutableList orderedKeys = mutations.getOrderedKeys(); - List tasks = new ArrayList<>(orderedKeys.size()); - for (PartitionedMutations.Key key : orderedKeys) { - tasks.add(() -> storage.mutate(mutations.get(key))); - } - parallelExecutor.prepare(tasks, snapshot.getId()); - } - - public void validate(Snapshot snapshot) throws ValidationException { + public void validateRecords(Snapshot snapshot) throws ValidationException { try { // validation is executed when SERIALIZABLE is chosen. snapshot.toSerializable(storage); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java index 61489428f5..c657c6b306 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java @@ -60,7 +60,7 @@ public ParallelExecutor(ConsensusCommitConfig config) { this.parallelExecutorService = parallelExecutorService; } - public void prepare(List tasks, String transactionId) + public void prepareRecords(List tasks, String transactionId) throws ExecutionException { try { // When parallel preparation is disabled, we stop running the tasks when one of them fails @@ -85,7 +85,7 @@ public void prepare(List tasks, String transactionId) } } - public void validate(List tasks, String transactionId) + public void validateRecords(List tasks, String transactionId) throws ExecutionException, ValidationConflictException { try { executeTasks( diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 195cb1bbc4..64c3e43b5c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -548,7 +548,7 @@ void toSerializable(DistributedStorage storage) } } - parallelExecutor.validate(tasks, getId()); + parallelExecutor.validateRecords(tasks, getId()); } /** diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java index dd72f03b4d..5c0126d59a 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java @@ -269,7 +269,7 @@ public void prepare() throws PreparationException { } try { - commit.prepare(crud.getSnapshot()); + commit.prepareRecords(crud.getSnapshot()); } finally { needRollback = true; } @@ -277,7 +277,7 @@ public void prepare() throws PreparationException { @Override public void validate() throws ValidationException { - commit.validate(crud.getSnapshot()); + commit.validateRecords(crud.getSnapshot()); validated = true; } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java index 63a16bdee4..c9a0785321 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java @@ -58,13 +58,13 @@ protected void afterEach() throws Exception { } @Test - public void prepare_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially() + public void prepareRecords_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelPreparationEnabled()).thenReturn(false); // Act - parallelExecutor.prepare(tasks, TX_ID); + parallelExecutor.prepareRecords(tasks, TX_ID); // Assert verify(task, times(tasks.size())).run(); @@ -73,14 +73,14 @@ public void prepare_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially() @Test public void - prepare_ParallelPreparationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() + prepareRecords_ParallelPreparationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelPreparationEnabled()).thenReturn(false); doThrow(ExecutionException.class).when(task).run(); // Act Assert - assertThatThrownBy(() -> parallelExecutor.prepare(tasks, TX_ID)) + assertThatThrownBy(() -> parallelExecutor.prepareRecords(tasks, TX_ID)) .isInstanceOf(ExecutionException.class); verify(task, only()).run(); @@ -88,13 +88,13 @@ public void prepare_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially() } @Test - public void prepare_ParallelPreparationEnabled_ShouldExecuteTasksInParallel() + public void prepareRecords_ParallelPreparationEnabled_ShouldExecuteTasksInParallel() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelPreparationEnabled()).thenReturn(true); // Act - parallelExecutor.prepare(tasks, TX_ID); + parallelExecutor.prepareRecords(tasks, TX_ID); // Assert verify(task, times(tasks.size())).run(); @@ -102,7 +102,7 @@ public void prepare_ParallelPreparationEnabled_ShouldExecuteTasksInParallel() } @Test - public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() + public void prepareRecords_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelPreparationEnabled()).thenReturn(true); @@ -111,7 +111,7 @@ public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTask tasks = Collections.singletonList(task); // Act - parallelExecutor.prepare(tasks, TX_ID); + parallelExecutor.prepareRecords(tasks, TX_ID); // Assert verify(task, times(tasks.size())).run(); @@ -120,14 +120,14 @@ public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTask @Test public void - prepare_ParallelPreparationEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks() + prepareRecords_ParallelPreparationEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelPreparationEnabled()).thenReturn(true); doThrow(ExecutionException.class).when(task).run(); // Act Assert - assertThatThrownBy(() -> parallelExecutor.prepare(tasks, TX_ID)) + assertThatThrownBy(() -> parallelExecutor.prepareRecords(tasks, TX_ID)) .isInstanceOf(ExecutionException.class); verify(task, times(tasks.size())).run(); @@ -135,13 +135,13 @@ public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTask } @Test - public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially() + public void validateRecords_ParallelValidationNotEnabled_ShouldExecuteTasksSerially() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(false); // Act - parallelExecutor.validate(tasks, TX_ID); + parallelExecutor.validateRecords(tasks, TX_ID); // Assert verify(task, times(tasks.size())).run(); @@ -150,14 +150,14 @@ public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially() @Test public void - validate_ParallelValidationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() + validateRecords_ParallelValidationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(false); doThrow(ExecutionException.class).when(task).run(); // Act Assert - assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID)) + assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID)) .isInstanceOf(ExecutionException.class); verify(task, only()).run(); @@ -166,14 +166,14 @@ public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially() @Test public void - validate_ParallelValidationNotEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks() + validateRecords_ParallelValidationNotEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(false); doThrow(ValidationConflictException.class).when(task).run(); // Act Assert - assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID)) + assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID)) .isInstanceOf(ValidationConflictException.class); verify(task, only()).run(); @@ -181,13 +181,13 @@ public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially() } @Test - public void validate_ParallelValidationEnabled_ShouldExecuteTasksInParallel() + public void validateRecords_ParallelValidationEnabled_ShouldExecuteTasksInParallel() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(true); // Act - parallelExecutor.validate(tasks, TX_ID); + parallelExecutor.validateRecords(tasks, TX_ID); // Assert verify(task, times(tasks.size())).run(); @@ -195,7 +195,7 @@ public void validate_ParallelValidationEnabled_ShouldExecuteTasksInParallel() } @Test - public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() + public void validateRecords_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(true); @@ -204,7 +204,7 @@ public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTask tasks = Collections.singletonList(task); // Act - parallelExecutor.validate(tasks, TX_ID); + parallelExecutor.validateRecords(tasks, TX_ID); // Assert verify(task, times(tasks.size())).run(); @@ -213,14 +213,14 @@ public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTask @Test public void - validate_ParallelValidationEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() + validateRecords_ParallelValidationEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(true); doThrow(ExecutionException.class).when(task).run(); // Act Assert - assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID)) + assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID)) .isInstanceOf(ExecutionException.class); verify(task, atMost(tasks.size())).run(); @@ -229,14 +229,14 @@ public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTask @Test public void - validate_ParallelValidationEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks() + validateRecords_ParallelValidationEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks() throws ExecutionException, ValidationConflictException, CrudException { // Arrange when(config.isParallelValidationEnabled()).thenReturn(true); doThrow(ValidationConflictException.class).when(task).run(); // Act Assert - assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID)) + assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID)) .isInstanceOf(ValidationConflictException.class); verify(task, atMost(tasks.size())).run(); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java index 0b80bf8b22..a2fd4d4867 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitTest.java @@ -706,7 +706,7 @@ public void mutate_PutAndDeleteGiven_ShouldCallCrudHandlerPutAndDelete() } @Test - public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot() + public void prepare_ProcessedCrudGiven_ShouldPrepareRecordsWithSnapshot() throws PreparationException, CrudException { // Arrange when(crud.getSnapshot()).thenReturn(snapshot); @@ -716,7 +716,7 @@ public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot() // Assert verify(crud).readIfImplicitPreReadEnabled(); - verify(commit).prepare(snapshot); + verify(commit).prepareRecords(snapshot); } @Test @@ -775,7 +775,7 @@ public void prepare_ScannerNotClosed_ShouldThrowIllegalStateException() { } @Test - public void validate_ProcessedCrudGiven_ShouldPerformValidationWithSnapshot() + public void validate_ProcessedCrudGiven_ShouldValidateRecordsWithSnapshot() throws ValidationException, PreparationException { // Arrange transaction.prepare(); @@ -785,7 +785,7 @@ public void validate_ProcessedCrudGiven_ShouldPerformValidationWithSnapshot() transaction.validate(); // Assert - verify(commit).validate(snapshot); + verify(commit).validateRecords(snapshot); } @Test @@ -855,7 +855,7 @@ public void rollback_CalledAfterPrepareFails_ShouldAbortStateAndRollbackRecords( throws TransactionException { // Arrange when(crud.getSnapshot()).thenReturn(snapshot); - doThrow(PreparationException.class).when(commit).prepare(snapshot); + doThrow(PreparationException.class).when(commit).prepareRecords(snapshot); // Act assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationException.class); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index c046a044c3..3566204ef7 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -3123,7 +3123,7 @@ void put_WhenTheOtherTransactionsIsDelayed_ShouldBeCommittedWithoutBlocked() thr @EnabledIf("isGroupCommitEnabled") void put_WhenTheOtherTransactionsFails_ShouldBeCommittedWithoutBlocked() throws Exception { // Arrange - doThrow(PreparationConflictException.class).when(commit).prepare(any()); + doThrow(PreparationConflictException.class).when(commit).prepareRecords(any()); // Act DistributedTransaction failingTxn = manager.begin(); @@ -3203,7 +3203,7 @@ void put_WhenAllTransactionsAbort_ShouldBeAbortedProperly() throws Exception { DistributedTransaction failingTxn1 = manager.begin(Isolation.SERIALIZABLE); DistributedTransaction failingTxn2 = manager.begin(Isolation.SERIALIZABLE); - doThrow(PreparationConflictException.class).when(commit).prepare(any()); + doThrow(PreparationConflictException.class).when(commit).prepareRecords(any()); failingTxn1.put(preparePut(0, 0, namespace1, TABLE_1)); failingTxn2.put(preparePut(1, 0, namespace1, TABLE_1));