diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index 7de29db0cd..f5aea46e46 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -4,10 +4,12 @@ import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue; import com.fasterxml.jackson.databind.ObjectMapper; +import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionAdmin; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataimport.ImportManager; import com.scalar.db.dataloader.core.dataimport.ImportOptions; import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; @@ -67,20 +69,32 @@ public Integer call() throws Exception { spec.commandLine(), dataChunkQueueSize, DataLoaderError.INVALID_DATA_CHUNK_QUEUE_SIZE); ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null); ImportOptions importOptions = createImportOptions(controlFile); - ImportLoggerConfig config = + ImportLoggerConfig importLoggerConfig = ImportLoggerConfig.builder() .logDirectoryPath(logDirectory) .isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord()) .isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords()) .prettyPrint(prettyPrint) .build(); - LogWriterFactory logWriterFactory = createLogWriterFactory(config); + LogWriterFactory logWriterFactory = createLogWriterFactory(importLoggerConfig); + File configFile = new File(configFilePath); + TransactionFactory transactionFactory = TransactionFactory.create(configFile); + + // Validate transaction mode configuration before proceeding + validateTransactionMode(transactionFactory); + Map tableMetadataMap = - createTableMetadataMap(controlFile, namespace, tableName); + createTableMetadataMap(controlFile, namespace, tableName, transactionFactory); try (BufferedReader reader = Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) { ImportManager importManager = - createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config); + createImportManager( + importOptions, + tableMetadataMap, + reader, + importLoggerConfig, + logWriterFactory, + transactionFactory); importManager.startImport(); } return 0; @@ -101,14 +115,16 @@ private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) { * @param controlFile control file * @param namespace Namespace * @param tableName Single table name + * @param transactionFactory transaction factory to use * @return {@code Map} a table metadata map * @throws ParameterException if one of the argument values is wrong */ private Map createTableMetadataMap( - ControlFile controlFile, String namespace, String tableName) - throws IOException, TableMetadataException { - File configFile = new File(configFilePath); - TransactionFactory transactionFactory = TransactionFactory.create(configFile); + ControlFile controlFile, + String namespace, + String tableName, + TransactionFactory transactionFactory) + throws TableMetadataException { try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) { TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin); Map tableMetadataMap = new HashMap<>(); @@ -133,18 +149,19 @@ private Map createTableMetadataMap( * @param importOptions import options * @param tableMetadataMap table metadata map * @param reader buffered reader with source data + * @param importLoggerConfig import logging config * @param logWriterFactory log writer factory object - * @param config import logging config + * @param transactionFactory transaction factory to use * @return ImportManager object */ private ImportManager createImportManager( ImportOptions importOptions, Map tableMetadataMap, BufferedReader reader, + ImportLoggerConfig importLoggerConfig, LogWriterFactory logWriterFactory, - ImportLoggerConfig config) + TransactionFactory transactionFactory) throws IOException { - File configFile = new File(configFilePath); ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory(); ImportManager importManager = new ImportManager( @@ -153,11 +170,12 @@ private ImportManager createImportManager( importOptions, importProcessorFactory, scalarDbMode, - TransactionFactory.create(configFile).getTransactionManager()); + transactionFactory.getTransactionManager()); if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) { - importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory)); + importManager.addListener( + new SplitByDataChunkImportLogger(importLoggerConfig, logWriterFactory)); } else { - importManager.addListener(new SingleFileImportLogger(config, logWriterFactory)); + importManager.addListener(new SingleFileImportLogger(importLoggerConfig, logWriterFactory)); } return importManager; } @@ -236,6 +254,48 @@ private void validateLogDirectory(String logDirectory) throws ParameterException } } + /** + * Validate transaction mode configuration by attempting to start and abort a transaction + * + * @param transactionFactory transaction factory to test + * @throws ParameterException if transaction mode is incompatible with the configured transaction + * manager + */ + void validateTransactionMode(TransactionFactory transactionFactory) { + // Only validate when in TRANSACTION mode + if (scalarDbMode != ScalarDbMode.TRANSACTION) { + return; + } + + DistributedTransaction transaction = null; + try { + // Try to start a read only transaction to verify the transaction manager is properly + // configured + transaction = transactionFactory.getTransactionManager().startReadOnly(); + } catch (UnsupportedOperationException e) { + // Transaction mode is not supported by the configured transaction manager + throw new ParameterException( + spec.commandLine(), + DataLoaderError.INVALID_TRANSACTION_MODE.buildMessage(e.getMessage()), + e); + } catch (Exception e) { + // Other exceptions - configuration or runtime error + throw new ParameterException( + spec.commandLine(), + DataLoaderError.TRANSACTION_MODE_VALIDATION_FAILED.buildMessage(e.getMessage()), + e); + } finally { + // Ensure transaction is aborted + if (transaction != null) { + try { + transaction.abort(); + } catch (Exception ignored) { + // Ignore errors during cleanup + } + } + } + } + /** * Generate control file from a valid control file path * diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java index 76fcf94cb3..cd81e1eef0 100755 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java @@ -3,9 +3,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataimport.ImportMode; +import com.scalar.db.service.TransactionFactory; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -279,4 +284,52 @@ void call_withoutMaxThreads_shouldDefaultToAvailableProcessors() { // Verify it was set to available processors assertEquals(Runtime.getRuntime().availableProcessors(), command.maxThreads); } + + @Test + void validateTransactionMode_withUnsupportedOperation_shouldThrowException() throws Exception { + // Arrange - Mock TransactionFactory to throw UnsupportedOperationException + TransactionFactory mockFactory = mock(TransactionFactory.class); + DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class); + + when(mockFactory.getTransactionManager()).thenReturn(mockManager); + when(mockManager.startReadOnly()) + .thenThrow(new UnsupportedOperationException("Transaction mode is not supported")); + + ImportCommand command = new ImportCommand(); + CommandLine cmd = new CommandLine(command); + command.spec = cmd.getCommandSpec(); + command.scalarDbMode = ScalarDbMode.TRANSACTION; + + // Act & Assert + CommandLine.ParameterException thrown = + assertThrows( + CommandLine.ParameterException.class, + () -> command.validateTransactionMode(mockFactory)); + + assertTrue(thrown.getMessage().contains("TRANSACTION mode is not compatible")); + } + + @Test + void validateTransactionMode_withOtherException_shouldThrowException() throws Exception { + // Arrange - Mock TransactionFactory to throw a different exception + TransactionFactory mockFactory = mock(TransactionFactory.class); + DistributedTransactionManager mockManager = mock(DistributedTransactionManager.class); + + when(mockFactory.getTransactionManager()).thenReturn(mockManager); + when(mockManager.startReadOnly()).thenThrow(new RuntimeException("Connection failed")); + + ImportCommand command = new ImportCommand(); + CommandLine cmd = new CommandLine(command); + command.spec = cmd.getCommandSpec(); + command.scalarDbMode = ScalarDbMode.TRANSACTION; + + // Act & Assert + CommandLine.ParameterException thrown = + assertThrows( + CommandLine.ParameterException.class, + () -> command.validateTransactionMode(mockFactory)); + + assertTrue(thrown.getMessage().contains("Failed to validate TRANSACTION mode")); + assertTrue(thrown.getMessage().contains("Connection failed")); + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java index 605da3bc94..b69be6b7b5 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/DataLoaderError.java @@ -216,6 +216,14 @@ public enum DataLoaderError implements ScalarDbError { "Cannot specify both deprecated option '%s' and new option '%s'. Please use only '%s'", "", ""), + INVALID_TRANSACTION_MODE( + Category.USER_ERROR, + "0058", + "TRANSACTION mode is not compatible with the current configuration. Please try with STORAGE mode or check your ScalarDB configuration. Details: %s", + "", + ""), + TRANSACTION_MODE_VALIDATION_FAILED( + Category.USER_ERROR, "0059", "Failed to validate TRANSACTION mode. Details: %s", "", ""), // // Errors for the internal error category