From 60935892d9944930a7d6396acdc6288c632765a6 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Sun, 15 Jun 2025 01:16:39 +0900 Subject: [PATCH] Optimize ParallelExecutor --- .../consensuscommit/ParallelExecutor.java | 7 ++ .../consensuscommit/ParallelExecutorTest.java | 111 ++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java index 44e7357270..61489428f5 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java @@ -155,6 +155,13 @@ private void executeTasks( String taskName, String transactionId) throws ExecutionException, ValidationConflictException, CrudException { + if (tasks.size() == 1 && !noWait) { + // If there is only one task and noWait is false, we can run it directly without parallel + // execution. + executeTasksSerially(tasks, stopOnError, taskName, transactionId); + return; + } + if (parallel) { executeTasksInParallel(tasks, noWait, stopOnError, taskName, transactionId); } else { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java index 8c98af1ccb..63a16bdee4 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java @@ -17,6 +17,7 @@ import com.scalar.db.exception.transaction.ValidationConflictException; import com.scalar.db.transaction.consensuscommit.ParallelExecutor.ParallelExecutorTask; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -100,6 +101,23 @@ public void prepare_ParallelPreparationEnabled_ShouldExecuteTasksInParallel() verify(parallelExecutorService, times(tasks.size())).execute(any()); } + @Test + public void prepare_ParallelPreparationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() + throws ExecutionException, ValidationConflictException, CrudException { + // Arrange + when(config.isParallelPreparationEnabled()).thenReturn(true); + + // A single task + tasks = Collections.singletonList(task); + + // Act + parallelExecutor.prepare(tasks, TX_ID); + + // Assert + verify(task, times(tasks.size())).run(); + verify(parallelExecutorService, never()).execute(any()); + } + @Test public void prepare_ParallelPreparationEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks() @@ -176,6 +194,23 @@ public void validate_ParallelValidationEnabled_ShouldExecuteTasksInParallel() verify(parallelExecutorService, times(tasks.size())).execute(any()); } + @Test + public void validate_ParallelValidationEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() + throws ExecutionException, ValidationConflictException, CrudException { + // Arrange + when(config.isParallelValidationEnabled()).thenReturn(true); + + // A single task + tasks = Collections.singletonList(task); + + // Act + parallelExecutor.validate(tasks, TX_ID); + + // Assert + verify(task, times(tasks.size())).run(); + verify(parallelExecutorService, never()).execute(any()); + } + @Test public void validate_ParallelValidationEnabled_ExecutionExceptionThrownByTask_ShouldStopRunningTasks() @@ -254,6 +289,25 @@ public void commitRecords_ParallelCommitNotEnabled_ShouldExecuteTasksSerially() verify(parallelExecutorService, times(tasks.size())).execute(any()); } + @Test + public void + commitRecords_ParallelCommitEnabledAndAsyncCommitNotEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() + throws ExecutionException, ValidationConflictException, CrudException { + // Arrange + when(config.isParallelCommitEnabled()).thenReturn(true); + when(config.isAsyncCommitEnabled()).thenReturn(false); + + // A single task + tasks = Collections.singletonList(task); + + // Act + parallelExecutor.commitRecords(tasks, TX_ID); + + // Assert + verify(task, times(tasks.size())).run(); + verify(parallelExecutorService, never()).execute(any()); + } + @Test public void commitRecords_ParallelCommitEnabledAndAsyncCommitNotEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks() @@ -287,6 +341,25 @@ public void commitRecords_ParallelCommitNotEnabled_ShouldExecuteTasksSerially() verify(parallelExecutorService, times(tasks.size())).execute(any()); } + @Test + public void + commitRecords_ParallelCommitEnabledAndAsyncCommitEnabled_SingleTaskGiven_ShouldExecuteTasksInParallelAndAsynchronously() + throws ExecutionException, ValidationConflictException, CrudException { + // Arrange + when(config.isParallelCommitEnabled()).thenReturn(true); + when(config.isAsyncCommitEnabled()).thenReturn(true); + + // A single task + tasks = Collections.singletonList(task); + + // Act + parallelExecutor.commitRecords(tasks, TX_ID); + + // Assert + verify(task, atMost(tasks.size())).run(); + verify(parallelExecutorService, times(tasks.size())).execute(any()); + } + @Test public void commitRecords_ParallelCommitEnabledAndAsyncCommitEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks() @@ -349,6 +422,25 @@ public void rollbackRecords_ParallelRollbackNotEnabled_ShouldExecuteTasksSeriall verify(parallelExecutorService, times(tasks.size())).execute(any()); } + @Test + public void + rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackNotEnabled_SingleTaskGiven_ShouldExecuteTasksSerially() + throws ExecutionException, ValidationConflictException, CrudException { + // Arrange + when(config.isParallelRollbackEnabled()).thenReturn(true); + when(config.isAsyncRollbackEnabled()).thenReturn(false); + + // A single task + tasks = Collections.singletonList(task); + + // Act + parallelExecutor.rollbackRecords(tasks, TX_ID); + + // Assert + verify(task, times(tasks.size())).run(); + verify(parallelExecutorService, never()).execute(any()); + } + @Test public void rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackNotEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks() @@ -382,6 +474,25 @@ public void rollbackRecords_ParallelRollbackNotEnabled_ShouldExecuteTasksSeriall verify(parallelExecutorService, times(tasks.size())).execute(any()); } + @Test + public void + rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackEnabled_SingleTaskGiven_ShouldExecuteTasksInParallelAndAsynchronously() + throws ExecutionException, ValidationConflictException, CrudException { + // Arrange + when(config.isParallelRollbackEnabled()).thenReturn(true); + when(config.isAsyncRollbackEnabled()).thenReturn(true); + + // A single task + tasks = Collections.singletonList(task); + + // Act + parallelExecutor.rollbackRecords(tasks, TX_ID); + + // Assert + verify(task, atMost(tasks.size())).run(); + verify(parallelExecutorService, times(tasks.size())).execute(any()); + } + @Test public void rollbackRecords_ParallelRollbackEnabledAndAsyncRollbackEnabled_ExecutionExceptionThrownByTask_ShouldNotStopRunningTasks()