Skip to content

Commit 88ac071

Browse files
committed
Refactor ParallelExecutor
1 parent 895c6c5 commit 88ac071

File tree

2 files changed

+266
-67
lines changed

2 files changed

+266
-67
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/ParallelExecutor.java

Lines changed: 66 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ public void executeImplicitPreRead(List<ParallelExecutorTask> tasks, String tran
147147
}
148148
}
149149

150-
private void executeTasks(
150+
@VisibleForTesting
151+
void executeTasks(
151152
List<ParallelExecutorTask> tasks,
152153
boolean parallel,
153154
boolean noWait,
@@ -158,14 +159,14 @@ private void executeTasks(
158159
if (tasks.size() == 1 && !noWait) {
159160
// If there is only one task and noWait is false, we can run it directly without parallel
160161
// execution.
161-
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
162+
tasks.get(0).run();
162163
return;
163164
}
164165

165166
if (parallel) {
166167
executeTasksInParallel(tasks, noWait, stopOnError, taskName, transactionId);
167168
} else {
168-
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
169+
executeTasksSerially(tasks, stopOnError);
169170
}
170171
}
171172

@@ -180,94 +181,92 @@ private void executeTasksInParallel(
180181

181182
CompletionService<Void> completionService =
182183
new ExecutorCompletionService<>(parallelExecutorService);
183-
tasks.forEach(
184-
t ->
185-
completionService.submit(
186-
() -> {
187-
try {
188-
t.run();
189-
} catch (Exception e) {
190-
logger.warn(
191-
"Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
192-
throw e;
193-
}
194-
return null;
195-
}));
196-
197-
if (!noWait) {
198-
Exception exception = null;
199-
for (int i = 0; i < tasks.size(); i++) {
200-
Future<Void> future = ScalarDbUtils.takeUninterruptibly(completionService);
201184

202-
try {
203-
Uninterruptibles.getUninterruptibly(future);
204-
} catch (java.util.concurrent.ExecutionException e) {
205-
if (e.getCause() instanceof ExecutionException) {
206-
if (!stopOnError) {
207-
exception = (ExecutionException) e.getCause();
208-
} else {
209-
throw (ExecutionException) e.getCause();
210-
}
211-
} else if (e.getCause() instanceof ValidationConflictException) {
212-
if (!stopOnError) {
213-
exception = (ValidationConflictException) e.getCause();
214-
} else {
215-
throw (ValidationConflictException) e.getCause();
216-
}
217-
} else if (e.getCause() instanceof CrudException) {
218-
if (!stopOnError) {
219-
exception = (CrudException) e.getCause();
220-
} else {
221-
throw (CrudException) e.getCause();
185+
// Submit tasks
186+
for (ParallelExecutorTask task : tasks) {
187+
completionService.submit(
188+
() -> {
189+
try {
190+
task.run();
191+
} catch (Exception e) {
192+
logger.warn(
193+
"Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
194+
throw e;
222195
}
223-
} else if (e.getCause() instanceof RuntimeException) {
224-
throw (RuntimeException) e.getCause();
225-
} else if (e.getCause() instanceof Error) {
226-
throw (Error) e.getCause();
196+
return null;
197+
});
198+
}
199+
200+
// Optionally wait for completion
201+
if (noWait) {
202+
return;
203+
}
204+
205+
Throwable throwable = null;
206+
207+
for (int i = 0; i < tasks.size(); i++) {
208+
Future<Void> future = ScalarDbUtils.takeUninterruptibly(completionService);
209+
try {
210+
Uninterruptibles.getUninterruptibly(future);
211+
} catch (java.util.concurrent.ExecutionException e) {
212+
Throwable cause = e.getCause();
213+
214+
if (stopOnError) {
215+
rethrow(cause);
216+
} else {
217+
if (throwable == null) {
218+
throwable = cause;
227219
} else {
228-
throw new AssertionError("Can't reach here. Maybe a bug", e);
220+
throwable.addSuppressed(cause);
229221
}
230222
}
231223
}
224+
}
232225

233-
if (!stopOnError && exception != null) {
234-
if (exception instanceof ExecutionException) {
235-
throw (ExecutionException) exception;
236-
} else if (exception instanceof ValidationConflictException) {
237-
throw (ValidationConflictException) exception;
238-
} else {
239-
throw (CrudException) exception;
240-
}
241-
}
226+
// Rethrow exception if necessary
227+
if (!stopOnError && throwable != null) {
228+
rethrow(throwable);
242229
}
243230
}
244231

245-
private void executeTasksSerially(
246-
List<ParallelExecutorTask> tasks, boolean stopOnError, String taskName, String transactionId)
232+
private void executeTasksSerially(List<ParallelExecutorTask> tasks, boolean stopOnError)
247233
throws ExecutionException, ValidationConflictException, CrudException {
248234
Exception exception = null;
249235
for (ParallelExecutorTask task : tasks) {
250236
try {
251237
task.run();
252238
} catch (ExecutionException | ValidationConflictException | CrudException e) {
253-
logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
254-
255239
if (!stopOnError) {
256-
exception = e;
240+
if (exception == null) {
241+
exception = e;
242+
} else {
243+
exception.addSuppressed(e);
244+
}
257245
} else {
258246
throw e;
259247
}
260248
}
261249
}
262250

263251
if (!stopOnError && exception != null) {
264-
if (exception instanceof ExecutionException) {
265-
throw (ExecutionException) exception;
266-
} else if (exception instanceof ValidationConflictException) {
267-
throw (ValidationConflictException) exception;
268-
} else {
269-
throw (CrudException) exception;
270-
}
252+
rethrow(exception);
253+
}
254+
}
255+
256+
private void rethrow(Throwable cause)
257+
throws ExecutionException, ValidationConflictException, CrudException {
258+
if (cause instanceof ExecutionException) {
259+
throw (ExecutionException) cause;
260+
} else if (cause instanceof ValidationConflictException) {
261+
throw (ValidationConflictException) cause;
262+
} else if (cause instanceof CrudException) {
263+
throw (CrudException) cause;
264+
} else if (cause instanceof RuntimeException) {
265+
throw (RuntimeException) cause;
266+
} else if (cause instanceof Error) {
267+
throw (Error) cause;
268+
} else {
269+
throw new AssertionError("Unexpected exception type", cause);
271270
}
272271
}
273272

core/src/test/java/com/scalar/db/transaction/consensuscommit/ParallelExecutorTest.java

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.mockito.ArgumentMatchers.any;
66
import static org.mockito.Mockito.atMost;
77
import static org.mockito.Mockito.doThrow;
8+
import static org.mockito.Mockito.mock;
89
import static org.mockito.Mockito.never;
910
import static org.mockito.Mockito.only;
1011
import static org.mockito.Mockito.spy;
@@ -562,4 +563,203 @@ public void executeImplicitPreRead_ParallelImplicitPreReadEnabled_ShouldExecuteT
562563
assertThatThrownBy(() -> parallelExecutor.executeImplicitPreRead(tasks, TX_ID))
563564
.isInstanceOf(CrudException.class);
564565
}
566+
567+
@Test
568+
public void executeTasks_SingleTaskAndNoWaitFalse_ShouldExecuteDirectly()
569+
throws ExecutionException, ValidationConflictException, CrudException {
570+
// Arrange
571+
List<ParallelExecutorTask> tasks = Collections.singletonList(task);
572+
boolean parallel = true; // Should be ignored
573+
boolean noWait = false;
574+
boolean stopOnError = true;
575+
576+
// Act
577+
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);
578+
579+
// Assert
580+
verify(task).run();
581+
verify(parallelExecutorService, never()).execute(any());
582+
}
583+
584+
@Test
585+
public void executeTasks_SingleTaskAndNoWaitTrue_ShouldUseParallelExecution()
586+
throws ExecutionException, ValidationConflictException, CrudException {
587+
// Arrange
588+
when(config.isParallelPreparationEnabled()).thenReturn(true);
589+
590+
List<ParallelExecutorTask> tasks = Collections.singletonList(task);
591+
boolean parallel = true;
592+
boolean noWait = true;
593+
boolean stopOnError = false;
594+
595+
// Act
596+
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);
597+
598+
// Assert
599+
verify(parallelExecutorService).execute(any());
600+
}
601+
602+
@Test
603+
public void executeTasks_ParallelTrue_ShouldExecuteTasksInParallel()
604+
throws ExecutionException, ValidationConflictException, CrudException {
605+
// Arrange
606+
boolean parallel = true;
607+
boolean noWait = false;
608+
boolean stopOnError = false;
609+
610+
// Act
611+
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);
612+
613+
// Assert
614+
verify(parallelExecutorService, times(tasks.size())).execute(any());
615+
}
616+
617+
@Test
618+
public void executeTasks_ParallelFalse_ShouldExecuteTasksSerially()
619+
throws ExecutionException, ValidationConflictException, CrudException {
620+
// Arrange
621+
boolean parallel = false;
622+
boolean noWait = false;
623+
boolean stopOnError = false;
624+
625+
// Act
626+
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);
627+
628+
// Assert
629+
verify(task, times(tasks.size())).run();
630+
verify(parallelExecutorService, never()).execute(any());
631+
}
632+
633+
@Test
634+
public void executeTasks_ParallelTrueAndStopOnErrorTrue_ExceptionThrown_ShouldStopExecution()
635+
throws ExecutionException, ValidationConflictException, CrudException {
636+
// Arrange
637+
boolean parallel = true;
638+
boolean noWait = false;
639+
boolean stopOnError = true;
640+
641+
doThrow(new ExecutionException("Test exception")).when(task).run();
642+
643+
// Act Assert
644+
assertThatThrownBy(
645+
() ->
646+
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID))
647+
.isInstanceOf(ExecutionException.class)
648+
.hasMessage("Test exception");
649+
650+
verify(parallelExecutorService, times(tasks.size())).execute(any());
651+
}
652+
653+
@Test
654+
public void
655+
executeTasks_ParallelTrueAndStopOnErrorFalse_ExceptionThrown_ShouldContinueOtherTasks()
656+
throws ExecutionException, ValidationConflictException, CrudException {
657+
// Arrange
658+
boolean parallel = true;
659+
boolean noWait = false;
660+
boolean stopOnError = false;
661+
662+
ParallelExecutorTask failingTask = mock(ParallelExecutorTask.class);
663+
doThrow(new ExecutionException("Test exception")).when(failingTask).run();
664+
665+
List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask, task);
666+
667+
// Act Assert
668+
assertThatThrownBy(
669+
() ->
670+
parallelExecutor.executeTasks(
671+
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
672+
.isInstanceOf(ExecutionException.class);
673+
674+
verify(parallelExecutorService, times(mixedTasks.size())).execute(any());
675+
}
676+
677+
@Test
678+
public void
679+
executeTasks_ParallelTrueAndStopOnErrorFalse_ExceptionThrownByMultipleTasks_ShouldContinueOtherTasks()
680+
throws ExecutionException, ValidationConflictException, CrudException {
681+
// Arrange
682+
boolean parallel = true;
683+
boolean noWait = false;
684+
boolean stopOnError = false;
685+
686+
ExecutionException executionException1 = new ExecutionException("Test exception1");
687+
ParallelExecutorTask failingTask1 = mock(ParallelExecutorTask.class);
688+
doThrow(executionException1).when(failingTask1).run();
689+
690+
ExecutionException executionException2 = new ExecutionException("Test exception2");
691+
ParallelExecutorTask failingTask2 = mock(ParallelExecutorTask.class);
692+
doThrow(executionException2).when(failingTask2).run();
693+
694+
List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask1, failingTask2, task);
695+
696+
// Act Assert
697+
assertThatThrownBy(
698+
() ->
699+
parallelExecutor.executeTasks(
700+
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
701+
.isEqualTo(executionException1)
702+
.hasSuppressedException(executionException2);
703+
704+
verify(parallelExecutorService, times(mixedTasks.size())).execute(any());
705+
}
706+
707+
@Test
708+
public void
709+
executeTasks_ParallelFalseAndStopOnErrorFalse_ExceptionThrown_ShouldContinueOtherTasks()
710+
throws ExecutionException, ValidationConflictException, CrudException {
711+
// Arrange
712+
boolean parallel = false;
713+
boolean noWait = false;
714+
boolean stopOnError = false;
715+
716+
ParallelExecutorTask failingTask = mock(ParallelExecutorTask.class);
717+
doThrow(new ExecutionException("Test exception")).when(failingTask).run();
718+
719+
List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask, task);
720+
721+
// Act Assert
722+
assertThatThrownBy(
723+
() ->
724+
parallelExecutor.executeTasks(
725+
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
726+
.isInstanceOf(ExecutionException.class);
727+
728+
verify(failingTask, only()).run();
729+
verify(task, only()).run();
730+
verify(parallelExecutorService, never()).execute(any());
731+
}
732+
733+
@Test
734+
public void
735+
executeTasks_ParallelFalseAndStopOnErrorFalse_ExceptionThrownByMultipleTasks_ShouldContinueOtherTasks()
736+
throws ExecutionException, ValidationConflictException, CrudException {
737+
// Arrange
738+
boolean parallel = false;
739+
boolean noWait = false;
740+
boolean stopOnError = false;
741+
742+
ExecutionException executionException1 = new ExecutionException("Test exception1");
743+
ParallelExecutorTask failingTask1 = mock(ParallelExecutorTask.class);
744+
doThrow(executionException1).when(failingTask1).run();
745+
746+
ExecutionException executionException2 = new ExecutionException("Test exception2");
747+
ParallelExecutorTask failingTask2 = mock(ParallelExecutorTask.class);
748+
doThrow(executionException2).when(failingTask2).run();
749+
750+
List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask1, failingTask2, task);
751+
752+
// Act Assert
753+
assertThatThrownBy(
754+
() ->
755+
parallelExecutor.executeTasks(
756+
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
757+
.isEqualTo(executionException1)
758+
.hasSuppressedException(executionException2);
759+
760+
verify(failingTask1, only()).run();
761+
verify(failingTask2, only()).run();
762+
verify(task, only()).run();
763+
verify(parallelExecutorService, never()).execute(any());
764+
}
565765
}

0 commit comments

Comments
 (0)