Skip to content

Commit fd37bca

Browse files
authored
Fix exception handling in the data loader importing to handle the unkown exceptions properly (#3183)
1 parent 72b2c7d commit fd37bca

File tree

2 files changed

+66
-9
lines changed

2 files changed

+66
-9
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.Semaphore;
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import javax.annotation.Nullable;
3536
import lombok.RequiredArgsConstructor;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
@@ -317,16 +318,24 @@ private ImportTransactionBatchResult processTransactionBatch(
317318

318319
} catch (TransactionException e) {
319320
isSuccess = false;
320-
logger.error(e.getMessage());
321-
try {
322-
if (transaction != null) {
323-
transaction.abort(); // Ensure transaction is aborted
324-
}
325-
} catch (TransactionException abortException) {
326-
logger.error(
327-
"Failed to abort transaction: {}", abortException.getMessage(), abortException);
328-
}
321+
logger.error(
322+
"Transaction failed for batch {} in data chunk {}: {}",
323+
transactionBatch.getTransactionBatchId(),
324+
dataChunkId,
325+
e.getMessage(),
326+
e);
327+
abortTransactionSafely(transaction);
329328
error = e.getMessage();
329+
} catch (Exception e) {
330+
// Catch unchecked exceptions
331+
isSuccess = false;
332+
logger.error(
333+
"Unexpected exception occurred while processing transaction batch {} in data chunk {}.",
334+
transactionBatch.getTransactionBatchId(),
335+
dataChunkId,
336+
e);
337+
abortTransactionSafely(transaction);
338+
error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage();
330339
}
331340
ImportTransactionBatchResult importTransactionBatchResult =
332341
ImportTransactionBatchResult.builder()
@@ -340,6 +349,22 @@ private ImportTransactionBatchResult processTransactionBatch(
340349
return importTransactionBatchResult;
341350
}
342351

352+
/**
353+
* Safely aborts the provided distributed transaction. If the transaction is null, this method
354+
* takes no action. If an exception occurs during the abort operation, it is logged as an error.
355+
*
356+
* @param transaction the {@link DistributedTransaction} to be aborted, may be null
357+
*/
358+
private void abortTransactionSafely(@Nullable DistributedTransaction transaction) {
359+
try {
360+
if (transaction != null) {
361+
transaction.abort();
362+
}
363+
} catch (Exception e) {
364+
logger.error("Failed to abort transaction: {}", e.getMessage(), e);
365+
}
366+
}
367+
343368
/**
344369
* Processes a single record in storage mode (non-transactional). Each record is processed
345370
* independently without transaction guarantees.

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataimport.processor;
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
45
import static org.junit.jupiter.api.Assertions.assertThrows;
56
import static org.junit.jupiter.api.Assertions.assertTrue;
67
import static org.mockito.ArgumentMatchers.any;
@@ -42,6 +43,7 @@
4243
import org.junit.jupiter.api.BeforeEach;
4344
import org.junit.jupiter.api.Test;
4445
import org.junit.jupiter.api.extension.ExtendWith;
46+
import org.mockito.ArgumentCaptor;
4547
import org.mockito.Mock;
4648
import org.mockito.junit.jupiter.MockitoExtension;
4749

@@ -250,6 +252,36 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() {
250252
assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed");
251253
}
252254

255+
@Test
256+
void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully()
257+
throws TransactionException {
258+
// Arrange
259+
BufferedReader reader = new BufferedReader(new StringReader("test data"));
260+
when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION);
261+
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
262+
when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error"));
263+
264+
TestImportProcessor processor = new TestImportProcessor(params);
265+
processor.addListener(eventListener);
266+
267+
// Act
268+
processor.process(2, 1, reader);
269+
270+
// Assert
271+
verify(eventListener, times(1)).onAllDataChunksCompleted();
272+
273+
// Capture and verify the transaction batch result
274+
ArgumentCaptor<ImportTransactionBatchResult> resultCaptor =
275+
ArgumentCaptor.forClass(ImportTransactionBatchResult.class);
276+
verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture());
277+
278+
ImportTransactionBatchResult result = resultCaptor.getValue();
279+
assertFalse(result.isSuccess());
280+
assertEquals(0, result.getTransactionBatchId());
281+
assertEquals(1, result.getDataChunkId());
282+
assertTrue(result.getErrors().get(0).contains("Unexpected error"));
283+
}
284+
253285
/**
254286
* A simple implementation of ImportProcessor for testing purposes. This class is used to test the
255287
* thread executor behavior in ImportProcessor.

0 commit comments

Comments
 (0)