Skip to content

Commit d7de972

Browse files
committed
Fix exception handling in the data loader importing to handle the unkown exceptions properly
1 parent d269d9d commit d7de972

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,12 @@ private ImportTransactionBatchResult processTransactionBatch(
317317

318318
} catch (TransactionException e) {
319319
isSuccess = false;
320-
logger.error(e.getMessage());
320+
logger.error(
321+
"Transaction failed for batch {} in data chunk {}: {}",
322+
transactionBatch.getTransactionBatchId(),
323+
dataChunkId,
324+
e.getMessage(),
325+
e);
321326
try {
322327
if (transaction != null) {
323328
transaction.abort(); // Ensure transaction is aborted
@@ -327,6 +332,25 @@ private ImportTransactionBatchResult processTransactionBatch(
327332
"Failed to abort transaction: {}", abortException.getMessage(), abortException);
328333
}
329334
error = e.getMessage();
335+
} catch (Exception e) {
336+
// Catch unknown exceptions
337+
isSuccess = false;
338+
logger.error(
339+
"Unexpected exception occurred while processing transaction batch {} in data chunk {}.",
340+
transactionBatch.getTransactionBatchId(),
341+
dataChunkId,
342+
e);
343+
try {
344+
if (transaction != null) {
345+
transaction.abort(); // Ensure transaction is aborted
346+
}
347+
} catch (Exception abortException) {
348+
logger.error(
349+
"Failed to abort transaction after unexpected error: {}",
350+
abortException.getMessage(),
351+
abortException);
352+
}
353+
error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage();
330354
}
331355
ImportTransactionBatchResult importTransactionBatchResult =
332356
ImportTransactionBatchResult.builder()

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataimport.processor;
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
45
import static org.junit.jupiter.api.Assertions.assertThrows;
56
import static org.junit.jupiter.api.Assertions.assertTrue;
67
import static org.mockito.ArgumentMatchers.any;
@@ -43,6 +44,7 @@
4344
import org.junit.jupiter.api.BeforeEach;
4445
import org.junit.jupiter.api.Test;
4546
import org.junit.jupiter.api.extension.ExtendWith;
47+
import org.mockito.ArgumentCaptor;
4648
import org.mockito.Mock;
4749
import org.mockito.junit.jupiter.MockitoExtension;
4850

@@ -256,6 +258,37 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() {
256258
assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed");
257259
}
258260

261+
@Test
262+
void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully()
263+
throws TransactionException {
264+
// Arrange
265+
BufferedReader reader = new BufferedReader(new StringReader("test data"));
266+
when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION);
267+
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
268+
when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error"));
269+
270+
TestImportProcessor processor = new TestImportProcessor(params);
271+
processor.addListener(eventListener);
272+
273+
// Act
274+
processor.process(2, 1, reader);
275+
276+
// Assert
277+
verify(eventListener, times(1)).onAllDataChunksCompleted();
278+
279+
// Capture and verify the transaction batch result
280+
ArgumentCaptor<ImportTransactionBatchResult> resultCaptor =
281+
ArgumentCaptor.forClass(ImportTransactionBatchResult.class);
282+
verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture());
283+
284+
ImportTransactionBatchResult result = resultCaptor.getValue();
285+
assertFalse(result.isSuccess());
286+
assertEquals(0, result.getTransactionBatchId());
287+
assertEquals(1, result.getDataChunkId());
288+
assertTrue(result.getErrors().get(0).contains("Unexpected error: RuntimeException"));
289+
assertTrue(result.getErrors().get(0).contains("Unexpected error"));
290+
}
291+
259292
/**
260293
* A simple implementation of ImportProcessor for testing purposes. This class is used to test the
261294
* thread executor behavior in ImportProcessor.

0 commit comments

Comments
 (0)