Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.CoreError;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportManager;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
Expand Down Expand Up @@ -75,12 +78,24 @@ public Integer call() throws Exception {
.prettyPrint(prettyPrint)
.build();
LogWriterFactory logWriterFactory = createLogWriterFactory(config);
File configFile = new File(configFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(configFile);

// Validate transaction mode configuration before proceeding
validateTransactionMode(transactionFactory);

Map<String, TableMetadata> tableMetadataMap =
createTableMetadataMap(controlFile, namespace, tableName);
createTableMetadataMap(controlFile, namespace, tableName, transactionFactory);
try (BufferedReader reader =
Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) {
ImportManager importManager =
createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config);
createImportManager(
importOptions,
tableMetadataMap,
reader,
logWriterFactory,
config,
transactionFactory);
importManager.startImport();
}
return 0;
Expand All @@ -101,14 +116,16 @@ private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
* @param controlFile control file
* @param namespace Namespace
* @param tableName Single table name
* @param transactionFactory transaction factory to use
* @return {@code Map<String, TableMetadata>} a table metadata map
* @throws ParameterException if one of the argument values is wrong
*/
private Map<String, TableMetadata> createTableMetadataMap(
ControlFile controlFile, String namespace, String tableName)
throws IOException, TableMetadataException {
File configFile = new File(configFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(configFile);
ControlFile controlFile,
String namespace,
String tableName,
TransactionFactory transactionFactory)
throws TableMetadataException {
try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
Expand All @@ -135,16 +152,17 @@ private Map<String, TableMetadata> createTableMetadataMap(
* @param reader buffered reader with source data
* @param logWriterFactory log writer factory object
* @param config import logging config
* @param transactionFactory transaction factory to use
* @return ImportManager object
*/
private ImportManager createImportManager(
ImportOptions importOptions,
Map<String, TableMetadata> tableMetadataMap,
BufferedReader reader,
LogWriterFactory logWriterFactory,
ImportLoggerConfig config)
ImportLoggerConfig config,
TransactionFactory transactionFactory)
throws IOException {
File configFile = new File(configFilePath);
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
ImportManager importManager =
new ImportManager(
Expand All @@ -153,7 +171,7 @@ private ImportManager createImportManager(
importOptions,
importProcessorFactory,
scalarDbMode,
TransactionFactory.create(configFile).getTransactionManager());
transactionFactory.getTransactionManager());
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
} else {
Expand Down Expand Up @@ -236,6 +254,62 @@ private void validateLogDirectory(String logDirectory) throws ParameterException
}
}

/**
* Validate transaction mode configuration by attempting to start and abort a transaction
*
* @param transactionFactory transaction factory to test
* @throws ParameterException if transaction mode is incompatible with the configured transaction
* manager
*/
void validateTransactionMode(TransactionFactory transactionFactory) {
// Only validate when in TRANSACTION mode
if (scalarDbMode != ScalarDbMode.TRANSACTION) {
return;
}

DistributedTransaction transaction = null;
try {
// Try to start a read only transaction to verify the transaction manager is properly
// configured
transaction = transactionFactory.getTransactionManager().startReadOnly();
} catch (Exception e) {
// Check for specific error about beginning transaction not allowed
if (e.getMessage() != null
&& e.getMessage()
.contains(
CoreError.SINGLE_CRUD_OPERATION_TRANSACTION_BEGINNING_TRANSACTION_NOT_ALLOWED
.buildCode())) {
throw new ParameterException(
spec.commandLine(),
DataLoaderError.INVALID_TRANSACTION_MODE.buildMessage(
"The current configuration does not support TRANSACTION mode. "
+ "Please try with STORAGE mode or check your ScalarDB configuration. "
+ "Error: "
+ e.getClass().getSimpleName()
+ " - "
+ e.getMessage()));
}

// Other exceptions - configuration or runtime error
throw new ParameterException(
spec.commandLine(),
DataLoaderError.INVALID_TRANSACTION_MODE.buildMessage(
"Failed to validate transaction mode compatibility. Error: "
+ e.getClass().getSimpleName()
+ " - "
+ e.getMessage()));
} finally {
// Ensure transaction is aborted
if (transaction != null) {
try {
transaction.abort();
} catch (Exception ignored) {
// Ignore errors during cleanup
}
}
}
}

/**
* Generate control file from a valid control file path
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.common.CoreError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportMode;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.service.TransactionFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -279,4 +286,63 @@ void call_withoutMaxThreads_shouldDefaultToAvailableProcessors() {
// Verify it was set to available processors
assertEquals(Runtime.getRuntime().availableProcessors(), command.maxThreads);
}

@Test
void validateTransactionMode_withInvalidConfig_shouldThrowException() throws Exception {
// Arrange - Mock TransactionFactory to throw exception with error
TransactionFactory mockFactory = mock(TransactionFactory.class);
DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class);

when(mockFactory.getTransactionManager()).thenReturn(mockManager);
when(mockManager.startReadOnly())
.thenThrow(
new TransactionException(
CoreError.SINGLE_CRUD_OPERATION_TRANSACTION_BEGINNING_TRANSACTION_NOT_ALLOWED
.buildMessage(),
null));

ImportCommand command = new ImportCommand();
CommandLine cmd = new CommandLine(command);
command.spec = cmd.getCommandSpec();
command.scalarDbMode = ScalarDbMode.TRANSACTION;

// Act & Assert
CommandLine.ParameterException thrown =
assertThrows(
CommandLine.ParameterException.class,
() -> command.validateTransactionMode(mockFactory));

assertTrue(thrown.getMessage().contains("does not support TRANSACTION mode"));
assertTrue(
thrown
.getMessage()
.contains(
CoreError.SINGLE_CRUD_OPERATION_TRANSACTION_BEGINNING_TRANSACTION_NOT_ALLOWED
.buildCode()));
}

@Test
void validateTransactionMode_withOtherException_shouldThrowException() throws Exception {
// Arrange - Mock TransactionFactory to throw a different exception
TransactionFactory mockFactory = mock(TransactionFactory.class);
DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class);

when(mockFactory.getTransactionManager()).thenReturn(mockManager);
when(mockManager.startReadOnly()).thenThrow(new RuntimeException("Connection failed"));

ImportCommand command = new ImportCommand();
CommandLine cmd = new CommandLine(command);
command.spec = cmd.getCommandSpec();
command.scalarDbMode = ScalarDbMode.TRANSACTION;

// Act & Assert
CommandLine.ParameterException thrown =
assertThrows(
CommandLine.ParameterException.class,
() -> command.validateTransactionMode(mockFactory));

assertTrue(thrown.getMessage().contains("Failed to validate transaction mode"));
assertTrue(thrown.getMessage().contains("RuntimeException"));
assertTrue(thrown.getMessage().contains("Connection failed"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ public enum DataLoaderError implements ScalarDbError {
"Cannot specify both deprecated option '%s' and new option '%s'. Please use only '%s'",
"",
""),
INVALID_TRANSACTION_MODE(
Category.USER_ERROR,
"0058",
"TRANSACTION mode is not compatible with the current configuration. %s",
"",
""),

//
// Errors for the internal error category
Expand Down
Loading