Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void executeImplicitPreRead(List<ParallelExecutorTask> tasks, String tran
}
}

private void executeTasks(
@VisibleForTesting
void executeTasks(
List<ParallelExecutorTask> tasks,
boolean parallel,
boolean noWait,
Expand All @@ -158,14 +159,14 @@ private void executeTasks(
if (tasks.size() == 1 && !noWait) {
// If there is only one task and noWait is false, we can run it directly without parallel
// execution.
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
tasks.get(0).run();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to directly call tasks.get(0).run().

return;
}

if (parallel) {
executeTasksInParallel(tasks, noWait, stopOnError, taskName, transactionId);
} else {
executeTasksSerially(tasks, stopOnError, taskName, transactionId);
executeTasksSerially(tasks, stopOnError);
}
}

Expand All @@ -180,94 +181,92 @@ private void executeTasksInParallel(

CompletionService<Void> completionService =
new ExecutorCompletionService<>(parallelExecutorService);
tasks.forEach(
t ->
completionService.submit(
() -> {
try {
t.run();
} catch (Exception e) {
logger.warn(
"Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
throw e;
}
return null;
}));

if (!noWait) {
Exception exception = null;
for (int i = 0; i < tasks.size(); i++) {
Future<Void> future = ScalarDbUtils.takeUninterruptibly(completionService);

try {
Uninterruptibles.getUninterruptibly(future);
} catch (java.util.concurrent.ExecutionException e) {
if (e.getCause() instanceof ExecutionException) {
if (!stopOnError) {
exception = (ExecutionException) e.getCause();
} else {
throw (ExecutionException) e.getCause();
}
} else if (e.getCause() instanceof ValidationConflictException) {
if (!stopOnError) {
exception = (ValidationConflictException) e.getCause();
} else {
throw (ValidationConflictException) e.getCause();
}
} else if (e.getCause() instanceof CrudException) {
if (!stopOnError) {
exception = (CrudException) e.getCause();
} else {
throw (CrudException) e.getCause();
// Submit tasks
for (ParallelExecutorTask task : tasks) {
completionService.submit(
() -> {
try {
task.run();
} catch (Exception e) {
logger.warn(
"Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
Comment on lines +192 to +193
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep this log because, even after using addSuppressed(), some exceptions may not be included as suppressed when stopOnError is set to true.

throw e;
}
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
return null;
});
}

// Optionally wait for completion
if (noWait) {
return;
}

Throwable throwable = null;

for (int i = 0; i < tasks.size(); i++) {
Future<Void> future = ScalarDbUtils.takeUninterruptibly(completionService);
try {
Uninterruptibles.getUninterruptibly(future);
} catch (java.util.concurrent.ExecutionException e) {
Throwable cause = e.getCause();

if (stopOnError) {
rethrow(cause);
} else {
if (throwable == null) {
throwable = cause;
} else {
throw new AssertionError("Can't reach here. Maybe a bug", e);
throwable.addSuppressed(cause);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use addSuppressed().

}
}
}
}

if (!stopOnError && exception != null) {
if (exception instanceof ExecutionException) {
throw (ExecutionException) exception;
} else if (exception instanceof ValidationConflictException) {
throw (ValidationConflictException) exception;
} else {
throw (CrudException) exception;
}
}
// Rethrow exception if necessary
if (!stopOnError && throwable != null) {
rethrow(throwable);
}
}

private void executeTasksSerially(
List<ParallelExecutorTask> tasks, boolean stopOnError, String taskName, String transactionId)
private void executeTasksSerially(List<ParallelExecutorTask> tasks, boolean stopOnError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The refactoring removed the logging of exceptions that occur during serial task execution. The logger.warn call was present in the previous version and provided valuable context, which is crucial for debugging. Consider adding the logging back to provide better context when exceptions occur.

  private void executeTasksSerially(List<ParallelExecutorTask> tasks, boolean stopOnError, String taskName, String transactionId)
      throws ExecutionException, ValidationConflictException, CrudException {
    Exception exception = null;
    for (ParallelExecutorTask task : tasks) {
      try {
        task.run();
      } catch (ExecutionException | ValidationConflictException | CrudException e) {
        logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
        if (!stopOnError) {
          if (exception == null) {
            exception = e;
          } else {
            exception.addSuppressed(e);
          }
        } else {
          throw e;
        }
      }
    }

    if (!stopOnError && exception != null) {
      rethrow(exception);
    }
  }

throws ExecutionException, ValidationConflictException, CrudException {
Exception exception = null;
for (ParallelExecutorTask task : tasks) {
try {
task.run();
} catch (ExecutionException | ValidationConflictException | CrudException e) {
Copy link

Copilot AI Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Serial execution no longer logs failures while parallel execution does. Consider re-adding a logger.warn inside this catch so serial runs also record taskName and transactionId on error.

Copilot uses AI. Check for mistakes.
logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After using addSuppressed(), we can remove this log because all exceptions should now be included as suppressed exceptions.


if (!stopOnError) {
exception = e;
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Updated to use addSuppressed().

}
} else {
throw e;
}
}
}

if (!stopOnError && exception != null) {
if (exception instanceof ExecutionException) {
throw (ExecutionException) exception;
} else if (exception instanceof ValidationConflictException) {
throw (ValidationConflictException) exception;
} else {
throw (CrudException) exception;
}
rethrow(exception);
}
}

private void rethrow(Throwable cause)
throws ExecutionException, ValidationConflictException, CrudException {
if (cause instanceof ExecutionException) {
throw (ExecutionException) cause;
} else if (cause instanceof ValidationConflictException) {
throw (ValidationConflictException) cause;
} else if (cause instanceof CrudException) {
throw (CrudException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else {
throw new AssertionError("Unexpected exception type", cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -562,4 +563,203 @@ public void executeImplicitPreRead_ParallelImplicitPreReadEnabled_ShouldExecuteT
assertThatThrownBy(() -> parallelExecutor.executeImplicitPreRead(tasks, TX_ID))
.isInstanceOf(CrudException.class);
}

@Test
public void executeTasks_SingleTaskAndNoWaitFalse_ShouldExecuteDirectly()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
List<ParallelExecutorTask> tasks = Collections.singletonList(task);
boolean parallel = true; // Should be ignored
boolean noWait = false;
boolean stopOnError = true;

// Act
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);

// Assert
verify(task).run();
verify(parallelExecutorService, never()).execute(any());
}

@Test
public void executeTasks_SingleTaskAndNoWaitTrue_ShouldUseParallelExecution()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
when(config.isParallelPreparationEnabled()).thenReturn(true);

List<ParallelExecutorTask> tasks = Collections.singletonList(task);
boolean parallel = true;
boolean noWait = true;
boolean stopOnError = false;

// Act
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);

// Assert
verify(parallelExecutorService).execute(any());
}

@Test
public void executeTasks_ParallelTrue_ShouldExecuteTasksInParallel()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = true;
boolean noWait = false;
boolean stopOnError = false;

// Act
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);

// Assert
verify(parallelExecutorService, times(tasks.size())).execute(any());
}

@Test
public void executeTasks_ParallelFalse_ShouldExecuteTasksSerially()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = false;
boolean noWait = false;
boolean stopOnError = false;

// Act
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID);

// Assert
verify(task, times(tasks.size())).run();
verify(parallelExecutorService, never()).execute(any());
}

@Test
public void executeTasks_ParallelTrueAndStopOnErrorTrue_ExceptionThrown_ShouldStopExecution()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = true;
boolean noWait = false;
boolean stopOnError = true;

doThrow(new ExecutionException("Test exception")).when(task).run();

// Act Assert
assertThatThrownBy(
() ->
parallelExecutor.executeTasks(tasks, parallel, noWait, stopOnError, "test", TX_ID))
.isInstanceOf(ExecutionException.class)
.hasMessage("Test exception");

verify(parallelExecutorService, times(tasks.size())).execute(any());
}

@Test
public void
executeTasks_ParallelTrueAndStopOnErrorFalse_ExceptionThrown_ShouldContinueOtherTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = true;
boolean noWait = false;
boolean stopOnError = false;

ParallelExecutorTask failingTask = mock(ParallelExecutorTask.class);
doThrow(new ExecutionException("Test exception")).when(failingTask).run();

List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask, task);

// Act Assert
assertThatThrownBy(
() ->
parallelExecutor.executeTasks(
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
.isInstanceOf(ExecutionException.class);

verify(parallelExecutorService, times(mixedTasks.size())).execute(any());
}

@Test
public void
executeTasks_ParallelTrueAndStopOnErrorFalse_ExceptionThrownByMultipleTasks_ShouldContinueOtherTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = true;
boolean noWait = false;
boolean stopOnError = false;

ExecutionException executionException1 = new ExecutionException("Test exception1");
ParallelExecutorTask failingTask1 = mock(ParallelExecutorTask.class);
doThrow(executionException1).when(failingTask1).run();

ExecutionException executionException2 = new ExecutionException("Test exception2");
ParallelExecutorTask failingTask2 = mock(ParallelExecutorTask.class);
doThrow(executionException2).when(failingTask2).run();

List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask1, failingTask2, task);

// Act Assert
assertThatThrownBy(
() ->
parallelExecutor.executeTasks(
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
.isEqualTo(executionException1)
.hasSuppressedException(executionException2);

verify(parallelExecutorService, times(mixedTasks.size())).execute(any());
}

@Test
public void
executeTasks_ParallelFalseAndStopOnErrorFalse_ExceptionThrown_ShouldContinueOtherTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = false;
boolean noWait = false;
boolean stopOnError = false;

ParallelExecutorTask failingTask = mock(ParallelExecutorTask.class);
doThrow(new ExecutionException("Test exception")).when(failingTask).run();

List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask, task);

// Act Assert
assertThatThrownBy(
() ->
parallelExecutor.executeTasks(
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
.isInstanceOf(ExecutionException.class);

verify(failingTask, only()).run();
verify(task, only()).run();
verify(parallelExecutorService, never()).execute(any());
}

@Test
public void
executeTasks_ParallelFalseAndStopOnErrorFalse_ExceptionThrownByMultipleTasks_ShouldContinueOtherTasks()
throws ExecutionException, ValidationConflictException, CrudException {
// Arrange
boolean parallel = false;
boolean noWait = false;
boolean stopOnError = false;

ExecutionException executionException1 = new ExecutionException("Test exception1");
ParallelExecutorTask failingTask1 = mock(ParallelExecutorTask.class);
doThrow(executionException1).when(failingTask1).run();

ExecutionException executionException2 = new ExecutionException("Test exception2");
ParallelExecutorTask failingTask2 = mock(ParallelExecutorTask.class);
doThrow(executionException2).when(failingTask2).run();

List<ParallelExecutorTask> mixedTasks = Arrays.asList(failingTask1, failingTask2, task);

// Act Assert
assertThatThrownBy(
() ->
parallelExecutor.executeTasks(
mixedTasks, parallel, noWait, stopOnError, "test", TX_ID))
.isEqualTo(executionException1)
.hasSuppressedException(executionException2);

verify(failingTask1, only()).run();
verify(failingTask2, only()).run();
verify(task, only()).run();
verify(parallelExecutorService, never()).execute(any());
}
}