Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -317,16 +318,24 @@ private ImportTransactionBatchResult processTransactionBatch(

} catch (TransactionException e) {
isSuccess = false;
logger.error(e.getMessage());
try {
if (transaction != null) {
transaction.abort(); // Ensure transaction is aborted
}
} catch (TransactionException abortException) {
logger.error(
"Failed to abort transaction: {}", abortException.getMessage(), abortException);
}
logger.error(
"Transaction failed for batch {} in data chunk {}: {}",
transactionBatch.getTransactionBatchId(),
dataChunkId,
e.getMessage(),
e);
abortTransactionSafely(transaction);
error = e.getMessage();
} catch (Exception e) {
// Catch unchecked exceptions
isSuccess = false;
logger.error(
"Unexpected exception occurred while processing transaction batch {} in data chunk {}.",
transactionBatch.getTransactionBatchId(),
dataChunkId,
e);
abortTransactionSafely(transaction);
error = "Unexpected error: " + e.getClass().getSimpleName() + " - " + e.getMessage();
}
ImportTransactionBatchResult importTransactionBatchResult =
ImportTransactionBatchResult.builder()
Expand All @@ -340,6 +349,22 @@ private ImportTransactionBatchResult processTransactionBatch(
return importTransactionBatchResult;
}

/**
* Safely aborts the provided distributed transaction. If the transaction is null, this method
* takes no action. If an exception occurs during the abort operation, it is logged as an error.
*
* @param transaction the {@link DistributedTransaction} to be aborted, may be null
*/
private void abortTransactionSafely(@Nullable DistributedTransaction transaction) {
try {
if (transaction != null) {
transaction.abort();
}
} catch (Exception e) {
logger.error("Failed to abort transaction: {}", e.getMessage(), e);
}
}

/**
* Processes a single record in storage mode (non-transactional). Each record is processed
* independently without transaction guarantees.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.dataloader.core.dataimport.processor;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand Down Expand Up @@ -250,6 +252,36 @@ void process_withShutdown_shouldShutdownExecutorsGracefully() {
assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed");
}

@Test
void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully()
throws TransactionException {
// Arrange
BufferedReader reader = new BufferedReader(new StringReader("test data"));
when(params.getScalarDbMode()).thenReturn(ScalarDbMode.TRANSACTION);
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error"));

TestImportProcessor processor = new TestImportProcessor(params);
processor.addListener(eventListener);

// Act
processor.process(2, 1, reader);

// Assert
verify(eventListener, times(1)).onAllDataChunksCompleted();

// Capture and verify the transaction batch result
ArgumentCaptor<ImportTransactionBatchResult> resultCaptor =
ArgumentCaptor.forClass(ImportTransactionBatchResult.class);
verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture());

ImportTransactionBatchResult result = resultCaptor.getValue();
assertFalse(result.isSuccess());
assertEquals(0, result.getTransactionBatchId());
assertEquals(1, result.getDataChunkId());
assertTrue(result.getErrors().get(0).contains("Unexpected error"));
}

/**
* A simple implementation of ImportProcessor for testing purposes. This class is used to test the
* thread executor behavior in ImportProcessor.
Expand Down