From f738ce94660ccb8ff1885c217be3a148a0c9626a Mon Sep 17 00:00:00 2001 From: Pham Ba Thong Date: Thu, 20 Nov 2025 10:48:22 +0900 Subject: [PATCH] Fix exception handling in the data loader importing to handle the unkown exceptions properly (#3183) --- .../dataimport/processor/ImportProcessor.java | 43 +++++++++++++++---- .../processor/ImportProcessorTest.java | 32 ++++++++++++++ 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 81daf9646e..8870a81306 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -317,16 +318,24 @@ private ImportTransactionBatchResult processTransactionBatch( } catch (TransactionException e) { isSuccess = false; - logger.error(e.getMessage()); - try { - if (transaction != null) { - transaction.abort(); // Ensure transaction is aborted - } - } catch (TransactionException abortException) { - logger.error( - "Failed to abort transaction: {}", abortException.getMessage(), abortException); - } + logger.error( + "Transaction failed for batch {} in data chunk {}: {}", + transactionBatch.getTransactionBatchId(), + dataChunkId, + e.getMessage(), + e); + abortTransactionSafely(transaction); error = e.getMessage(); + } catch (Exception e) { + // Catch unchecked exceptions + isSuccess = false; + logger.error( + "Unexpected exception occurred while processing transaction batch {} in data chunk {}.", + transactionBatch.getTransactionBatchId(), + dataChunkId, + e); + abortTransactionSafely(transaction); + error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage(); } ImportTransactionBatchResult importTransactionBatchResult = ImportTransactionBatchResult.builder() @@ -340,6 +349,22 @@ private ImportTransactionBatchResult processTransactionBatch( return importTransactionBatchResult; } + /** + * Safely aborts the provided distributed transaction. If the transaction is null, this method + * takes no action. If an exception occurs during the abort operation, it is logged as an error. + * + * @param transaction the {@link DistributedTransaction} to be aborted, may be null + */ + private void abortTransactionSafely(@Nullable DistributedTransaction transaction) { + try { + if (transaction != null) { + transaction.abort(); + } + } catch (Exception e) { + logger.error("Failed to abort transaction: {}", e.getMessage(), e); + } + } + /** * Processes a single record in storage mode (non-transactional). Each record is processed * independently without transaction guarantees. diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index d60ebecb00..4dc393b096 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.processor; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -43,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -256,6 +258,36 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() { assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed"); } + @Test + void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully() + throws TransactionException { + // Arrange + BufferedReader reader = new BufferedReader(new StringReader("test data")); + when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION); + when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager); + when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error")); + + TestImportProcessor processor = new TestImportProcessor(params); + processor.addListener(eventListener); + + // Act + processor.process(2, 1, reader); + + // Assert + verify(eventListener, times(1)).onAllDataChunksCompleted(); + + // Capture and verify the transaction batch result + ArgumentCaptor resultCaptor = + ArgumentCaptor.forClass(ImportTransactionBatchResult.class); + verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture()); + + ImportTransactionBatchResult result = resultCaptor.getValue(); + assertFalse(result.isSuccess()); + assertEquals(0, result.getTransactionBatchId()); + assertEquals(1, result.getDataChunkId()); + assertTrue(result.getErrors().get(0).contains("Unexpected error")); + } + /** * A simple implementation of ImportProcessor for testing purposes. This class is used to test the * thread executor behavior in ImportProcessor.