Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,12 @@ private ImportTransactionBatchResult processTransactionBatch(

} catch (TransactionException e) {
isSuccess = false;
logger.error(e.getMessage());
logger.error(
"Transaction failed for batch {} in data chunk {}: {}",
transactionBatch.getTransactionBatchId(),
dataChunkId,
e.getMessage(),
e);
try {
if (transaction != null) {
transaction.abort(); // Ensure transaction is aborted
Expand All @@ -327,6 +332,25 @@ private ImportTransactionBatchResult processTransactionBatch(
"Failed to abort transaction: {}", abortException.getMessage(), abortException);
}
error = e.getMessage();
} catch (Exception e) {
// Catch unknown exceptions
isSuccess = false;
logger.error(
"Unexpected exception occurred while processing transaction batch {} in data chunk {}.",
transactionBatch.getTransactionBatchId(),
dataChunkId,
e);
try {
if (transaction != null) {
transaction.abort(); // Ensure transaction is aborted
}
} catch (Exception abortException) {
logger.error(
"Failed to abort transaction after unexpected error: {}",
abortException.getMessage(),
abortException);
}
error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage();
}
ImportTransactionBatchResult importTransactionBatchResult =
ImportTransactionBatchResult.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.dataloader.core.dataimport.processor;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand Down Expand Up @@ -256,6 +258,37 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() {
assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed");
}

@Test
void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully()
throws TransactionException {
// Arrange
BufferedReader reader = new BufferedReader(new StringReader("test data"));
when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION);
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error"));

TestImportProcessor processor = new TestImportProcessor(params);
processor.addListener(eventListener);

// Act
processor.process(2, 1, reader);

// Assert
verify(eventListener, times(1)).onAllDataChunksCompleted();

// Capture and verify the transaction batch result
ArgumentCaptor<ImportTransactionBatchResult> resultCaptor =
ArgumentCaptor.forClass(ImportTransactionBatchResult.class);
verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture());

ImportTransactionBatchResult result = resultCaptor.getValue();
assertFalse(result.isSuccess());
assertEquals(0, result.getTransactionBatchId());
assertEquals(1, result.getDataChunkId());
assertTrue(result.getErrors().get(0).contains("Unexpected error: RuntimeException"));
assertTrue(result.getErrors().get(0).contains("Unexpected error"));
}

/**
* A simple implementation of ImportProcessor for testing purposes. This class is used to test the
* thread executor behavior in ImportProcessor.
Expand Down
Loading