Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void commit(Snapshot snapshot, boolean readOnly)

if (hasWritesOrDeletesInSnapshot) {
try {
prepare(snapshot);
prepareRecords(snapshot);
} catch (PreparationException e) {
safelyCallOnFailureBeforeCommit(snapshot);
abortState(snapshot.getId());
Expand All @@ -134,7 +134,7 @@ public void commit(Snapshot snapshot, boolean readOnly)

if (snapshot.hasReads()) {
try {
validate(snapshot);
validateRecords(snapshot);
} catch (ValidationException e) {
safelyCallOnFailureBeforeCommit(snapshot);

Expand Down Expand Up @@ -194,9 +194,19 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
}
}

public void prepare(Snapshot snapshot) throws PreparationException {
public void prepareRecords(Snapshot snapshot) throws PreparationException {
try {
prepareRecords(snapshot);
PrepareMutationComposer composer =
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
snapshot.to(composer);
PartitionedMutations mutations = new PartitionedMutations(composer.get());

ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
for (PartitionedMutations.Key key : orderedKeys) {
tasks.add(() -> storage.mutate(mutations.get(key)));
}
parallelExecutor.prepareRecords(tasks, snapshot.getId());
} catch (NoMutationException e) {
throw new PreparationConflictException(
CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId());
Expand All @@ -211,22 +221,7 @@ public void prepare(Snapshot snapshot) throws PreparationException {
}
}

private void prepareRecords(Snapshot snapshot)
throws ExecutionException, PreparationConflictException {
PrepareMutationComposer composer =
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
snapshot.to(composer);
PartitionedMutations mutations = new PartitionedMutations(composer.get());

ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
for (PartitionedMutations.Key key : orderedKeys) {
tasks.add(() -> storage.mutate(mutations.get(key)));
}
parallelExecutor.prepare(tasks, snapshot.getId());
}

public void validate(Snapshot snapshot) throws ValidationException {
public void validateRecords(Snapshot snapshot) throws ValidationException {
try {
// validation is executed when SERIALIZABLE is chosen.
snapshot.toSerializable(storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public ParallelExecutor(ConsensusCommitConfig config) {
this.parallelExecutorService = parallelExecutorService;
}

public void prepare(List<ParallelExecutorTask> tasks, String transactionId)
public void prepareRecords(List<ParallelExecutorTask> tasks, String transactionId)
throws ExecutionException {
try {
// When parallel preparation is disabled, we stop running the tasks when one of them fails
Expand All @@ -85,7 +85,7 @@ public void prepare(List<ParallelExecutorTask> tasks, String transactionId)
}
}

public void validate(List<ParallelExecutorTask> tasks, String transactionId)
public void validateRecords(List<ParallelExecutorTask> tasks, String transactionId)
throws ExecutionException, ValidationConflictException {
try {
executeTasks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ void toSerializable(DistributedStorage storage)
}
}

parallelExecutor.validate(tasks, getId());
parallelExecutor.validateRecords(tasks, getId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,15 @@ public void prepare() throws PreparationException {
}

try {
commit.prepare(crud.getSnapshot());
commit.prepareRecords(crud.getSnapshot());
} finally {
needRollback = true;
}
}

@Override
public void validate() throws ValidationException {
commit.validate(crud.getSnapshot());
commit.validateRecords(crud.getSnapshot());
validated = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ protected void afterEach() throws Exception {
}

@Test
public void prepare_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially()
public void prepareRecords_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelPreparationEnabled()).thenReturn(false);

// Act
parallelExecutor.prepare(tasks, TX_ID);
parallelExecutor.prepareRecords(tasks, TX_ID);

// Assert
verify(task, times(tasks.size())).run();
Expand All @@ -73,36 +73,36 @@ public void prepare_ParallelPreparationNotEnabled_ShouldExecuteTasksSerially()

@Test
public void
prepare_ParallelPreparationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
prepareRecords_ParallelPreparationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelPreparationEnabled()).thenReturn(false);
doThrow(ExecutionException.class).when(task).run();

// Act Assert
assertThatThrownBy(() -> parallelExecutor.prepare(tasks, TX_ID))
assertThatThrownBy(() -> parallelExecutor.prepareRecords(tasks, TX_ID))
.isInstanceOf(ExecutionException.class);

verify(task, only()).run();
verify(parallelExecutorService, never()).execute(any());
}

@Test
public void prepare_ParallelPreparationEnabled_ShouldExecuteTasksInParallel()
public void prepareRecords_ParallelPreparationEnabled_ShouldExecuteTasksInParallel()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelPreparationEnabled()).thenReturn(true);

// Act
parallelExecutor.prepare(tasks, TX_ID);
parallelExecutor.prepareRecords(tasks, TX_ID);

// Assert
verify(task, times(tasks.size())).run();
verify(parallelExecutorService, times(tasks.size())).execute(any());
}

@Test
public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
public void prepareRecords_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelPreparationEnabled()).thenReturn(true);
Expand All @@ -111,7 +111,7 @@ public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTask
tasks = Collections.singletonList(task);

// Act
parallelExecutor.prepare(tasks, TX_ID);
parallelExecutor.prepareRecords(tasks, TX_ID);

// Assert
verify(task, times(tasks.size())).run();
Expand All @@ -120,28 +120,28 @@ public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTask

@Test
public void
prepare_ParallelPreparationEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()
prepareRecords_ParallelPreparationEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelPreparationEnabled()).thenReturn(true);
doThrow(ExecutionException.class).when(task).run();

// Act Assert
assertThatThrownBy(() -> parallelExecutor.prepare(tasks, TX_ID))
assertThatThrownBy(() -> parallelExecutor.prepareRecords(tasks, TX_ID))
.isInstanceOf(ExecutionException.class);

verify(task, times(tasks.size())).run();
verify(parallelExecutorService, times(tasks.size())).execute(any());
}

@Test
public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially()
public void validateRecords_ParallelValidationNotEnabled_ShouldExecuteTasksSerially()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(false);

// Act
parallelExecutor.validate(tasks, TX_ID);
parallelExecutor.validateRecords(tasks, TX_ID);

// Assert
verify(task, times(tasks.size())).run();
Expand All @@ -150,14 +150,14 @@ public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially()

@Test
public void
validate_ParallelValidationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
validateRecords_ParallelValidationNotEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(false);
doThrow(ExecutionException.class).when(task).run();

// Act Assert
assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID))
assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID))
.isInstanceOf(ExecutionException.class);

verify(task, only()).run();
Expand All @@ -166,36 +166,36 @@ public void validate_ParallelValidationNotEnabled_ShouldExecuteTasksSerially()

@Test
public void
validate_ParallelValidationNotEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks()
validateRecords_ParallelValidationNotEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(false);
doThrow(ValidationConflictException.class).when(task).run();

// Act Assert
assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID))
assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID))
.isInstanceOf(ValidationConflictException.class);

verify(task, only()).run();
verify(parallelExecutorService, never()).execute(any());
}

@Test
public void validate_ParallelValidationEnabled_ShouldExecuteTasksInParallel()
public void validateRecords_ParallelValidationEnabled_ShouldExecuteTasksInParallel()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(true);

// Act
parallelExecutor.validate(tasks, TX_ID);
parallelExecutor.validateRecords(tasks, TX_ID);

// Assert
verify(task, times(tasks.size())).run();
verify(parallelExecutorService, times(tasks.size())).execute(any());
}

@Test
public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
public void validateRecords_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(true);
Expand All @@ -204,7 +204,7 @@ public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTask
tasks = Collections.singletonList(task);

// Act
parallelExecutor.validate(tasks, TX_ID);
parallelExecutor.validateRecords(tasks, TX_ID);

// Assert
verify(task, times(tasks.size())).run();
Expand All @@ -213,14 +213,14 @@ public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTask

@Test
public void
validate_ParallelValidationEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
validateRecords_ParallelValidationEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(true);
doThrow(ExecutionException.class).when(task).run();

// Act Assert
assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID))
assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID))
.isInstanceOf(ExecutionException.class);

verify(task, atMost(tasks.size())).run();
Expand All @@ -229,14 +229,14 @@ public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTask

@Test
public void
validate_ParallelValidationEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks()
validateRecords_ParallelValidationEnabled_ValidationConflictExceptionThrownByTask_ShouldStopRunningTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelValidationEnabled()).thenReturn(true);
doThrow(ValidationConflictException.class).when(task).run();

// Act Assert
assertThatThrownBy(() -> parallelExecutor.validate(tasks, TX_ID))
assertThatThrownBy(() -> parallelExecutor.validateRecords(tasks, TX_ID))
.isInstanceOf(ValidationConflictException.class);

verify(task, atMost(tasks.size())).run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public void mutate_PutAndDeleteGiven_ShouldCallCrudHandlerPutAndDelete()
}

@Test
public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot()
public void prepare_ProcessedCrudGiven_ShouldPrepareRecordsWithSnapshot()
throws PreparationException, CrudException {
// Arrange
when(crud.getSnapshot()).thenReturn(snapshot);
Expand All @@ -716,7 +716,7 @@ public void prepare_ProcessedCrudGiven_ShouldPrepareWithSnapshot()

// Assert
verify(crud).readIfImplicitPreReadEnabled();
verify(commit).prepare(snapshot);
verify(commit).prepareRecords(snapshot);
}

@Test
Expand Down Expand Up @@ -775,7 +775,7 @@ public void prepare_ScannerNotClosed_ShouldThrowIllegalStateException() {
}

@Test
public void validate_ProcessedCrudGiven_ShouldPerformValidationWithSnapshot()
public void validate_ProcessedCrudGiven_ShouldValidateRecordsWithSnapshot()
throws ValidationException, PreparationException {
// Arrange
transaction.prepare();
Expand All @@ -785,7 +785,7 @@ public void validate_ProcessedCrudGiven_ShouldPerformValidationWithSnapshot()
transaction.validate();

// Assert
verify(commit).validate(snapshot);
verify(commit).validateRecords(snapshot);
}

@Test
Expand Down Expand Up @@ -855,7 +855,7 @@ public void rollback_CalledAfterPrepareFails_ShouldAbortStateAndRollbackRecords(
throws TransactionException {
// Arrange
when(crud.getSnapshot()).thenReturn(snapshot);
doThrow(PreparationException.class).when(commit).prepare(snapshot);
doThrow(PreparationException.class).when(commit).prepareRecords(snapshot);

// Act
assertThatThrownBy(transaction::prepare).isInstanceOf(PreparationException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3123,7 +3123,7 @@ void put_WhenTheOtherTransactionsIsDelayed_ShouldBeCommittedWithoutBlocked() thr
@EnabledIf("isGroupCommitEnabled")
void put_WhenTheOtherTransactionsFails_ShouldBeCommittedWithoutBlocked() throws Exception {
// Arrange
doThrow(PreparationConflictException.class).when(commit).prepare(any());
doThrow(PreparationConflictException.class).when(commit).prepareRecords(any());

// Act
DistributedTransaction failingTxn = manager.begin();
Expand Down Expand Up @@ -3203,7 +3203,7 @@ void put_WhenAllTransactionsAbort_ShouldBeAbortedProperly() throws Exception {
DistributedTransaction failingTxn1 = manager.begin(Isolation.SERIALIZABLE);
DistributedTransaction failingTxn2 = manager.begin(Isolation.SERIALIZABLE);

doThrow(PreparationConflictException.class).when(commit).prepare(any());
doThrow(PreparationConflictException.class).when(commit).prepareRecords(any());

failingTxn1.put(preparePut(0, 0, namespace1, TABLE_1));
failingTxn2.put(preparePut(1, 0, namespace1, TABLE_1));
Expand Down