Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportManager;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
Expand Down Expand Up @@ -67,20 +69,32 @@ public Integer call() throws Exception {
spec.commandLine(), dataChunkQueueSize, DataLoaderError.INVALID_DATA_CHUNK_QUEUE_SIZE);
ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null);
ImportOptions importOptions = createImportOptions(controlFile);
ImportLoggerConfig config =
ImportLoggerConfig importLoggerConfig =
ImportLoggerConfig.builder()
.logDirectoryPath(logDirectory)
.isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord())
.isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords())
.prettyPrint(prettyPrint)
.build();
LogWriterFactory logWriterFactory = createLogWriterFactory(config);
LogWriterFactory logWriterFactory = createLogWriterFactory(importLoggerConfig);
File configFile = new File(configFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(configFile);

// Validate transaction mode configuration before proceeding
validateTransactionMode(transactionFactory);

Map<String, TableMetadata> tableMetadataMap =
createTableMetadataMap(controlFile, namespace, tableName);
createTableMetadataMap(controlFile, namespace, tableName, transactionFactory);
try (BufferedReader reader =
Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) {
ImportManager importManager =
createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config);
createImportManager(
importOptions,
tableMetadataMap,
reader,
importLoggerConfig,
logWriterFactory,
transactionFactory);
importManager.startImport();
}
return 0;
Expand All @@ -101,14 +115,16 @@ private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
* @param controlFile control file
* @param namespace Namespace
* @param tableName Single table name
* @param transactionFactory transaction factory to use
* @return {@code Map<String, TableMetadata>} a table metadata map
* @throws ParameterException if one of the argument values is wrong
*/
private Map<String, TableMetadata> createTableMetadataMap(
ControlFile controlFile, String namespace, String tableName)
throws IOException, TableMetadataException {
File configFile = new File(configFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(configFile);
ControlFile controlFile,
String namespace,
String tableName,
TransactionFactory transactionFactory)
throws TableMetadataException {
try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
Expand All @@ -133,18 +149,19 @@ private Map<String, TableMetadata> createTableMetadataMap(
* @param importOptions import options
* @param tableMetadataMap table metadata map
* @param reader buffered reader with source data
* @param importLoggerConfig import logging config
* @param logWriterFactory log writer factory object
* @param config import logging config
* @param transactionFactory transaction factory to use
* @return ImportManager object
*/
private ImportManager createImportManager(
ImportOptions importOptions,
Map<String, TableMetadata> tableMetadataMap,
BufferedReader reader,
ImportLoggerConfig importLoggerConfig,
LogWriterFactory logWriterFactory,
ImportLoggerConfig config)
TransactionFactory transactionFactory)
throws IOException {
File configFile = new File(configFilePath);
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
ImportManager importManager =
new ImportManager(
Expand All @@ -153,11 +170,12 @@ private ImportManager createImportManager(
importOptions,
importProcessorFactory,
scalarDbMode,
TransactionFactory.create(configFile).getTransactionManager());
transactionFactory.getTransactionManager());
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
importManager.addListener(
new SplitByDataChunkImportLogger(importLoggerConfig, logWriterFactory));
} else {
importManager.addListener(new SingleFileImportLogger(config, logWriterFactory));
importManager.addListener(new SingleFileImportLogger(importLoggerConfig, logWriterFactory));
}
return importManager;
}
Expand Down Expand Up @@ -236,6 +254,48 @@ private void validateLogDirectory(String logDirectory) throws ParameterException
}
}

/**
* Validate transaction mode configuration by attempting to start and abort a transaction
*
* @param transactionFactory transaction factory to test
* @throws ParameterException if transaction mode is incompatible with the configured transaction
* manager
*/
void validateTransactionMode(TransactionFactory transactionFactory) {
// Only validate when in TRANSACTION mode
if (scalarDbMode != ScalarDbMode.TRANSACTION) {
return;
}

DistributedTransaction transaction = null;
try {
// Try to start a read only transaction to verify the transaction manager is properly
// configured
transaction = transactionFactory.getTransactionManager().startReadOnly();
} catch (UnsupportedOperationException e) {
// Transaction mode is not supported by the configured transaction manager
throw new ParameterException(
spec.commandLine(),
DataLoaderError.INVALID_TRANSACTION_MODE.buildMessage(e.getMessage()),
e);
} catch (Exception e) {
// Other exceptions - configuration or runtime error
throw new ParameterException(
spec.commandLine(),
DataLoaderError.TRANSACTION_MODE_VALIDATION_FAILED.buildMessage(e.getMessage()),
e);
} finally {
// Ensure transaction is aborted
if (transaction != null) {
try {
transaction.abort();
} catch (Exception ignored) {
// Ignore errors during cleanup
}
}
}
}

/**
* Generate control file from a valid control file path
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportMode;
import com.scalar.db.service.TransactionFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -279,4 +284,52 @@ void call_withoutMaxThreads_shouldDefaultToAvailableProcessors() {
// Verify it was set to available processors
assertEquals(Runtime.getRuntime().availableProcessors(), command.maxThreads);
}

@Test
void validateTransactionMode_withUnsupportedOperation_shouldThrowException() throws Exception {
// Arrange - Mock TransactionFactory to throw UnsupportedOperationException
TransactionFactory mockFactory = mock(TransactionFactory.class);
DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class);

when(mockFactory.getTransactionManager()).thenReturn(mockManager);
when(mockManager.startReadOnly())
.thenThrow(new UnsupportedOperationException("Transaction mode is not supported"));

ImportCommand command = new ImportCommand();
CommandLine cmd = new CommandLine(command);
command.spec = cmd.getCommandSpec();
command.scalarDbMode = ScalarDbMode.TRANSACTION;

// Act & Assert
CommandLine.ParameterException thrown =
assertThrows(
CommandLine.ParameterException.class,
() -> command.validateTransactionMode(mockFactory));

assertTrue(thrown.getMessage().contains("TRANSACTION mode is not compatible"));
}

@Test
void validateTransactionMode_withOtherException_shouldThrowException() throws Exception {
// Arrange - Mock TransactionFactory to throw a different exception
TransactionFactory mockFactory = mock(TransactionFactory.class);
DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class);

when(mockFactory.getTransactionManager()).thenReturn(mockManager);
when(mockManager.startReadOnly()).thenThrow(new RuntimeException("Connection failed"));

ImportCommand command = new ImportCommand();
CommandLine cmd = new CommandLine(command);
command.spec = cmd.getCommandSpec();
command.scalarDbMode = ScalarDbMode.TRANSACTION;

// Act & Assert
CommandLine.ParameterException thrown =
assertThrows(
CommandLine.ParameterException.class,
() -> command.validateTransactionMode(mockFactory));

assertTrue(thrown.getMessage().contains("Failed to validate TRANSACTION mode"));
assertTrue(thrown.getMessage().contains("Connection failed"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public enum DataLoaderError implements ScalarDbError {
"Cannot specify both deprecated option '%s' and new option '%s'. Please use only '%s'",
"",
""),
INVALID_TRANSACTION_MODE(
Category.USER_ERROR,
"0058",
"TRANSACTION mode is not compatible with the current configuration. Please try with STORAGE mode or check your ScalarDB configuration. Details: %s",
"",
""),
TRANSACTION_MODE_VALIDATION_FAILED(
Category.USER_ERROR, "0059", "Failed to validate TRANSACTION mode. Details: %s", "", ""),

//
// Errors for the internal error category
Expand Down