Skip to content

Commit 8f56f19

Browse files
Backport to branch(3) : Optimize ParallelExecutor (#2780)
Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent bf30f06 commit 8f56f19

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ private void executeTasks(
155155
String taskName,
156156
String transactionId)
157157
throws ExecutionException, ValidationConflictException, CrudException {
158+
if (tasks.size() == 1 && !noWait) {
159+
// If there is only one task and noWait is false, we can run it directly without parallel
160+
// execution.
161+
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
162+
return;
163+
}
164+
158165
if (parallel) {
159166
executeTasksInParallel(tasks, noWait, stopOnError, taskName, transactionId);
160167
} else {

core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.scalar.db.exception.transaction.ValidationConflictException;
1818
import com.scalar.db.transaction.consensuscommit.ParallelExecutor.ParallelExecutorTask;
1919
import java.util.Arrays;
20+
import java.util.Collections;
2021
import java.util.List;
2122
import java.util.concurrent.ExecutorService;
2223
import java.util.concurrent.Executors;
@@ -100,6 +101,23 @@ public void prepare_ParallelPreparationEnabled_ShouldExecuteTasksInParallel()
100101
verify(parallelExecutorService, times(tasks.size())).execute(any());
101102
}
102103

104+
@Test
105+
public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
106+
throws ExecutionException, ValidationConflictException, CrudException {
107+
// Arrange
108+
when(config.isParallelPreparationEnabled()).thenReturn(true);
109+
110+
// A single task
111+
tasks = Collections.singletonList(task);
112+
113+
// Act
114+
parallelExecutor.prepare(tasks, TX_ID);
115+
116+
// Assert
117+
verify(task, times(tasks.size())).run();
118+
verify(parallelExecutorService, never()).execute(any());
119+
}
120+
103121
@Test
104122
public void
105123
prepare_ParallelPreparationEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()
@@ -176,6 +194,23 @@ public void validate_ParallelValidationEnabled_ShouldExecuteTasksInParallel()
176194
verify(parallelExecutorService, times(tasks.size())).execute(any());
177195
}
178196

197+
@Test
198+
public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
199+
throws ExecutionException, ValidationConflictException, CrudException {
200+
// Arrange
201+
when(config.isParallelValidationEnabled()).thenReturn(true);
202+
203+
// A single task
204+
tasks = Collections.singletonList(task);
205+
206+
// Act
207+
parallelExecutor.validate(tasks, TX_ID);
208+
209+
// Assert
210+
verify(task, times(tasks.size())).run();
211+
verify(parallelExecutorService, never()).execute(any());
212+
}
213+
179214
@Test
180215
public void
181216
validate_ParallelValidationEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks()
@@ -254,6 +289,25 @@ public void commitRecords_ParallelCommitNotEnabled_ShouldExecuteTasksSerially()
254289
verify(parallelExecutorService, times(tasks.size())).execute(any());
255290
}
256291

292+
@Test
293+
public void
294+
commitRecords_ParallelCommitEnabledAndAsyncCommitNotEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
295+
throws ExecutionException, ValidationConflictException, CrudException {
296+
// Arrange
297+
when(config.isParallelCommitEnabled()).thenReturn(true);
298+
when(config.isAsyncCommitEnabled()).thenReturn(false);
299+
300+
// A single task
301+
tasks = Collections.singletonList(task);
302+
303+
// Act
304+
parallelExecutor.commitRecords(tasks, TX_ID);
305+
306+
// Assert
307+
verify(task, times(tasks.size())).run();
308+
verify(parallelExecutorService, never()).execute(any());
309+
}
310+
257311
@Test
258312
public void
259313
commitRecords_ParallelCommitEnabledAndAsyncCommitNotEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()
@@ -287,6 +341,25 @@ public void commitRecords_ParallelCommitNotEnabled_ShouldExecuteTasksSerially()
287341
verify(parallelExecutorService, times(tasks.size())).execute(any());
288342
}
289343

344+
@Test
345+
public void
346+
commitRecords_ParallelCommitEnabledAndAsyncCommitEnabled_SingleTaskGiven_ShouldExecuteTasksInParallelAndAsynchronously()
347+
throws ExecutionException, ValidationConflictException, CrudException {
348+
// Arrange
349+
when(config.isParallelCommitEnabled()).thenReturn(true);
350+
when(config.isAsyncCommitEnabled()).thenReturn(true);
351+
352+
// A single task
353+
tasks = Collections.singletonList(task);
354+
355+
// Act
356+
parallelExecutor.commitRecords(tasks, TX_ID);
357+
358+
// Assert
359+
verify(task, atMost(tasks.size())).run();
360+
verify(parallelExecutorService, times(tasks.size())).execute(any());
361+
}
362+
290363
@Test
291364
public void
292365
commitRecords_ParallelCommitEnabledAndAsyncCommitEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()
@@ -349,6 +422,25 @@ public void rollbackRecords_ParallelRollbackNotEnabled_ShouldExecuteTasksSeriall
349422
verify(parallelExecutorService, times(tasks.size())).execute(any());
350423
}
351424

425+
@Test
426+
public void
427+
rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackNotEnabled_SingleTaskGiven_ShouldExecuteTasksSerially()
428+
throws ExecutionException, ValidationConflictException, CrudException {
429+
// Arrange
430+
when(config.isParallelRollbackEnabled()).thenReturn(true);
431+
when(config.isAsyncRollbackEnabled()).thenReturn(false);
432+
433+
// A single task
434+
tasks = Collections.singletonList(task);
435+
436+
// Act
437+
parallelExecutor.rollbackRecords(tasks, TX_ID);
438+
439+
// Assert
440+
verify(task, times(tasks.size())).run();
441+
verify(parallelExecutorService, never()).execute(any());
442+
}
443+
352444
@Test
353445
public void
354446
rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackNotEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()
@@ -382,6 +474,25 @@ public void rollbackRecords_ParallelRollbackNotEnabled_ShouldExecuteTasksSeriall
382474
verify(parallelExecutorService, times(tasks.size())).execute(any());
383475
}
384476

477+
@Test
478+
public void
479+
rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackEnabled_SingleTaskGiven_ShouldExecuteTasksInParallelAndAsynchronously()
480+
throws ExecutionException, ValidationConflictException, CrudException {
481+
// Arrange
482+
when(config.isParallelRollbackEnabled()).thenReturn(true);
483+
when(config.isAsyncRollbackEnabled()).thenReturn(true);
484+
485+
// A single task
486+
tasks = Collections.singletonList(task);
487+
488+
// Act
489+
parallelExecutor.rollbackRecords(tasks, TX_ID);
490+
491+
// Assert
492+
verify(task, atMost(tasks.size())).run();
493+
verify(parallelExecutorService, times(tasks.size())).execute(any());
494+
}
495+
385496
@Test
386497
public void
387498
rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()

0 commit comments

Comments
 (0)