diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index a8c3def492..426df29a19 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -32,6 +32,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -317,16 +318,24 @@ private ImportTransactionBatchResult processTransactionBatch( } catch (TransactionException e) { isSuccess = false; - logger.error(e.getMessage()); - try { - if (transaction != null) { - transaction.abort(); // Ensure transaction is aborted - } - } catch (TransactionException abortException) { - logger.error( - "Failed to abort transaction: {}", abortException.getMessage(), abortException); - } + logger.error( + "Transaction failed for batch {} in data chunk {}: {}", + transactionBatch.getTransactionBatchId(), + dataChunkId, + e.getMessage(), + e); + abortTransactionSafely(transaction); error = e.getMessage(); + } catch (Exception e) { + // Catch unchecked exceptions + isSuccess = false; + logger.error( + "Unexpected exception occurred while processing transaction batch {} in data chunk {}.", + transactionBatch.getTransactionBatchId(), + dataChunkId, + e); + abortTransactionSafely(transaction); + error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage(); } ImportTransactionBatchResult importTransactionBatchResult = ImportTransactionBatchResult.builder() @@ -340,6 +349,22 @@ private ImportTransactionBatchResult processTransactionBatch( return importTransactionBatchResult; } + /** + * Safely aborts the provided distributed transaction. If the transaction is null, this method + * takes no action. If an exception occurs during the abort operation, it is logged as an error. + * + * @param transaction the {@link DistributedTransaction} to be aborted, may be null + */ + private void abortTransactionSafely(@Nullable DistributedTransaction transaction) { + try { + if (transaction != null) { + transaction.abort(); + } + } catch (Exception e) { + logger.error("Failed to abort transaction: {}", e.getMessage(), e); + } + } + /** * Processes a single record in storage mode (non-transactional). Each record is processed * independently without transaction guarantees. diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java index 24c272a45d..19b713d6f1 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.processor; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -42,6 +43,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -250,6 +252,36 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() { assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed"); } + @Test + void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully() + throws TransactionException { + // Arrange + BufferedReader reader = new BufferedReader(new StringReader("test data")); + when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION); + when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager); + when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error")); + + TestImportProcessor processor = new TestImportProcessor(params); + processor.addListener(eventListener); + + // Act + processor.process(2, 1, reader); + + // Assert + verify(eventListener, times(1)).onAllDataChunksCompleted(); + + // Capture and verify the transaction batch result + ArgumentCaptor resultCaptor = + ArgumentCaptor.forClass(ImportTransactionBatchResult.class); + verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture()); + + ImportTransactionBatchResult result = resultCaptor.getValue(); + assertFalse(result.isSuccess()); + assertEquals(0, result.getTransactionBatchId()); + assertEquals(1, result.getDataChunkId()); + assertTrue(result.getErrors().get(0).contains("Unexpected error")); + } + /** * A simple implementation of ImportProcessor for testing purposes. This class is used to test the * thread executor behavior in ImportProcessor.