From d7de972be5a4e1413af8f9abce444bd9f202d9e2 Mon Sep 17 00:00:00 2001 From: Pham Ba Thong Date: Tue, 18 Nov 2025 18:22:36 +0900 Subject: [PATCH 1/4] Fix exception handling in the data loader importing to handle the unkown exceptions properly --- .../dataimport/processor/ImportProcessor.java | 26 ++++++++++++++- .../processor/ImportProcessorTest.java | 33 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 81daf9646e..8f3a220d36 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -317,7 +317,12 @@ private ImportTransactionBatchResult processTransactionBatch( } catch (TransactionException e) { isSuccess = false; - logger.error(e.getMessage()); + logger.error( + "Transaction failed for batch {} in data chunk {}: {}", + transactionBatch.getTransactionBatchId(), + dataChunkId, + e.getMessage(), + e); try { if (transaction != null) { transaction.abort(); // Ensure transaction is aborted @@ -327,6 +332,25 @@ private ImportTransactionBatchResult processTransactionBatch( "Failed to abort transaction: {}", abortException.getMessage(), abortException); } error = e.getMessage(); + } catch (Exception e) { + // Catch unknown exceptions + isSuccess = false; + logger.error( + "Unexpected exception occurred while processing transaction batch {} in data chunk {}.", + transactionBatch.getTransactionBatchId(), + dataChunkId, + e); + try { + if (transaction != null) { + transaction.abort(); // Ensure transaction is aborted + } + } catch (Exception abortException) { + logger.error( + "Failed to abort transaction after unexpected error: {}", + abortException.getMessage(), + abortException); + } + error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage(); } ImportTransactionBatchResult importTransactionBatchResult = ImportTransactionBatchResult.builder() diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index d60ebecb00..abae392a83 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.processor; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -43,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -256,6 +258,37 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() { assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed"); } + @Test + void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully() + throws TransactionException { + // Arrange + BufferedReader reader = new BufferedReader(new StringReader("test data")); + when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION); + when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager); + when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error")); + + TestImportProcessor processor = new TestImportProcessor(params); + processor.addListener(eventListener); + + // Act + processor.process(2, 1, reader); + + // Assert + verify(eventListener, times(1)).onAllDataChunksCompleted(); + + // Capture and verify the transaction batch result + ArgumentCaptor resultCaptor = + ArgumentCaptor.forClass(ImportTransactionBatchResult.class); + verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture()); + + ImportTransactionBatchResult result = resultCaptor.getValue(); + assertFalse(result.isSuccess()); + assertEquals(0, result.getTransactionBatchId()); + assertEquals(1, result.getDataChunkId()); + assertTrue(result.getErrors().get(0).contains("Unexpected error: RuntimeException")); + assertTrue(result.getErrors().get(0).contains("Unexpected error")); + } + /** * A simple implementation of ImportProcessor for testing purposes. This class is used to test the * thread executor behavior in ImportProcessor. From 6d2584189b6b5238ba3bbae4468cae4f7055a5b1 Mon Sep 17 00:00:00 2001 From: Pham Ba Thong Date: Tue, 18 Nov 2025 18:34:16 +0900 Subject: [PATCH 2/4] apply suggestions --- .../dataloader/core/dataimport/processor/ImportProcessor.java | 2 +- .../core/dataimport/processor/ImportProcessorTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 8f3a220d36..c456a72bff 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -333,7 +333,7 @@ private ImportTransactionBatchResult processTransactionBatch( } error = e.getMessage(); } catch (Exception e) { - // Catch unknown exceptions + // Catch unchecked exceptions isSuccess = false; logger.error( "Unexpected exception occurred while processing transaction batch {} in data chunk {}.", diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index abae392a83..4dc393b096 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -285,7 +285,6 @@ void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully() assertFalse(result.isSuccess()); assertEquals(0, result.getTransactionBatchId()); assertEquals(1, result.getDataChunkId()); - assertTrue(result.getErrors().get(0).contains("Unexpected error: RuntimeException")); assertTrue(result.getErrors().get(0).contains("Unexpected error")); } From 187a8094f182aa08af0d986dbd74f44393959968 Mon Sep 17 00:00:00 2001 From: Pham Ba Thong Date: Tue, 18 Nov 2025 18:43:47 +0900 Subject: [PATCH 3/4] apply suggestions --- .../dataimport/processor/ImportProcessor.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index c456a72bff..8d40e4e5fb 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -323,14 +323,7 @@ private ImportTransactionBatchResult processTransactionBatch( dataChunkId, e.getMessage(), e); - try { - if (transaction != null) { - transaction.abort(); // Ensure transaction is aborted - } - } catch (TransactionException abortException) { - logger.error( - "Failed to abort transaction: {}", abortException.getMessage(), abortException); - } + abortTransactionSafely(transaction); error = e.getMessage(); } catch (Exception e) { // Catch unchecked exceptions @@ -340,16 +333,7 @@ private ImportTransactionBatchResult processTransactionBatch( transactionBatch.getTransactionBatchId(), dataChunkId, e); - try { - if (transaction != null) { - transaction.abort(); // Ensure transaction is aborted - } - } catch (Exception abortException) { - logger.error( - "Failed to abort transaction after unexpected error: {}", - abortException.getMessage(), - abortException); - } + abortTransactionSafely(transaction); error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage(); } ImportTransactionBatchResult importTransactionBatchResult = @@ -364,6 +348,22 @@ private ImportTransactionBatchResult processTransactionBatch( return importTransactionBatchResult; } + /** + * Safely aborts the provided distributed transaction. If the transaction is null, this method + * takes no action. If an exception occurs during the abort operation, it is logged as an error. + * + * @param transaction the {@link DistributedTransaction} to be aborted, may be null + */ + private void abortTransactionSafely(DistributedTransaction transaction) { + try { + if (transaction != null) { + transaction.abort(); + } + } catch (Exception e) { + logger.error("Failed to abort transaction: {}", e.getMessage(), e); + } + } + /** * Processes a single record in storage mode (non-transactional). Each record is processed * independently without transaction guarantees. From 57ce0ea384b1e0db10a1dc35a7fc115da88d1a3e Mon Sep 17 00:00:00 2001 From: Pham Ba Thong Date: Wed, 19 Nov 2025 16:29:53 +0900 Subject: [PATCH 4/4] apply suggestions --- .../dataloader/core/dataimport/processor/ImportProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 8d40e4e5fb..8870a81306 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -354,7 +355,7 @@ private ImportTransactionBatchResult processTransactionBatch( * * @param transaction the {@link DistributedTransaction} to be aborted, may be null */ - private void abortTransactionSafely(DistributedTransaction transaction) { + private void abortTransactionSafely(@Nullable DistributedTransaction transaction) { try { if (transaction != null) { transaction.abort();