Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.scalar.db.dataloader.cli;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be merged until #3147 is completed. Have to update the target branch before that.


/**
* @deprecated As of release 3.17.0 This enum is deprecated and will be removed in release 4.0.0.
* The behavior is now determined by the transaction manager configuration in the ScalarDB
* properties file. This enum is kept only for backward compatibility with the deprecated --mode
* CLI option.
*/
@Deprecated
public enum ScalarDbMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it may seem strange to create a new class that is deprecated right away, this class is added to the cli code as the original class, and its enums have been renamed in the data loader core code for clarity.

/** Storage mode (single-crud operations). */
STORAGE,

/** Transaction mode (consensus commit). */
TRANSACTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.ParameterException;
Expand All @@ -44,13 +46,16 @@
@CommandLine.Command(name = "import", description = "Import data into a ScalarDB table")
public class ImportCommand extends ImportCommandOptions implements Callable<Integer> {

private static final Logger logger = LoggerFactory.getLogger(ImportCommand.class);

/** Spec injected by PicoCli */
@Spec CommandSpec spec;

@Override
public Integer call() throws Exception {
validateDeprecatedOptions();
applyDeprecatedOptions();
warnAboutIgnoredDeprecatedOptions();
validateImportTarget(controlFilePath, namespace, tableName);
validateLogDirectory(logDirectory);
validatePositiveValue(
Expand Down Expand Up @@ -152,7 +157,6 @@ private ImportManager createImportManager(
reader,
importOptions,
importProcessorFactory,
scalarDbMode,
TransactionFactory.create(configFile).getTransactionManager());
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
Expand Down Expand Up @@ -276,6 +280,28 @@ private void validateDeprecatedOptions() {
ENABLE_LOG_SUCCESS_RECORDS_OPTION_SHORT);
}

/** Warns about deprecated options that are no longer used and have been completely ignored. */
private void warnAboutIgnoredDeprecatedOptions() {
CommandLine.ParseResult parseResult = spec.commandLine().getParseResult();
boolean hasModeOption =
parseResult.hasMatchedOption(DEPRECATED_MODE_OPTION)
|| parseResult.hasMatchedOption(DEPRECATED_MODE_OPTION_SHORT);

if (hasModeOption) {
// Use picocli's ANSI support for colored warning output
CommandLine.Help.Ansi ansi = CommandLine.Help.Ansi.AUTO;
String warning =
ansi.string(
"@|bold,yellow The "
+ DEPRECATED_MODE_OPTION
+ " option is deprecated and no longer has any effect. "
+ "The import behavior is now determined by the transaction manager configuration "
+ "in your ScalarDB properties file.|@");

logger.warn(warning);
}
}

/**
* Generate import options object from provided cli parameter data
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.scalar.db.dataloader.cli.command.dataimport;

import com.scalar.db.dataloader.cli.ScalarDbMode;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportMode;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
import picocli.CommandLine;
Expand All @@ -17,11 +17,24 @@ public class ImportCommandOptions {
public static final String ENABLE_LOG_SUCCESS_RECORDS_OPTION_SHORT = "-ls";
public static final String DEPRECATED_LOG_SUCCESS_RECORDS_OPTION = "--log-success";

public static final String DEPRECATED_MODE_OPTION = "--mode";
public static final String DEPRECATED_MODE_OPTION_SHORT = "-m";

/**
* @deprecated As of release 3.17.0 This option is no longer used and will be removed in release
* 4.0.0. The option is not fully removed as users who might already have their scripts or
* commands pre-set might pass the argument and when passed if not supported, picocli will
* throw an error. We want to avoid that and instead just show a warning. The behavior is now
* determined by the transaction manager configuration in the ScalarDB properties file.
*/
@Deprecated
@CommandLine.Option(
names = {"--mode", "-m"},
description = "ScalarDB mode (STORAGE, TRANSACTION) (default: STORAGE)",
description =
"Deprecated: This option is no longer used. The behavior is now determined by the transaction manager configuration in the ScalarDB properties file.",
paramLabel = "<MODE>",
defaultValue = "STORAGE")
defaultValue = "STORAGE",
hidden = true)
protected ScalarDbMode scalarDbMode;

@CommandLine.Option(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.scalar.db.dataloader.core;

/**
* The available transaction modes for data import operations. Determines how data is imported based
* on the transaction manager configuration.
*/
public enum TransactionMode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class replaces the ScalarDbMode enum class.


/**
* Single-crud mode: Each operation is committed immediately without separate transaction
* management. Used when the transaction manager is configured as single-crud operation mode. Each
* CRUD operation is executed and committed individually.
*/
SINGLE_CRUD,

/**
* Consensus commit mode: Operations are grouped into transaction batches with managed
* transactions. Used with standard transaction managers that provide ACID guarantees across
* multiple operations. Enables commit coordination across multiple operations.
*/
CONSENSUS_COMMIT
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
Expand All @@ -12,6 +12,7 @@
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -33,16 +34,15 @@
* <li>Notifying listeners of various import events
* </ul>
*/
@SuppressWarnings("SameNameButDifferent")
@AllArgsConstructor
@SuppressWarnings("SameNameButDifferent")
public class ImportManager implements ImportEventListener {

@NonNull private final Map<String, TableMetadata> tableMetadata;
@NonNull private final BufferedReader importFileReader;
@NonNull private final ImportOptions importOptions;
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDbMode scalarDbMode;
private final DistributedTransactionManager distributedTransactionManager;

/**
Expand All @@ -51,8 +51,20 @@ public class ImportManager implements ImportEventListener {
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
* size.
*
* <p>The ScalarDB mode is determined automatically based on the transaction manager
* configuration: if the transaction manager is configured as single-crud, STORAGE mode is used;
* otherwise, TRANSACTION mode is used by default.
*/
public void startImport() {
// Determine ScalarDB mode based on transaction manager type. This mode will be refactored later
// and removed so that the whole code can just use one way to import all the data with the
// correct interface and not depend on making this distinction.
TransactionMode scalarDbMode =
distributedTransactionManager instanceof SingleCrudOperationTransactionManager
? TransactionMode.SINGLE_CRUD
: TransactionMode.CONSENSUS_COMMIT;

ImportProcessorParams params =
ImportProcessorParams.builder()
.scalarDbMode(scalarDbMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
Expand Down Expand Up @@ -54,7 +54,7 @@ public abstract class ImportProcessor {
*
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
* batches transactions according to the specified sizes. The processing can be done in either
* transactional or storage mode, depending on the configured {@link ScalarDbMode}.
* transactional or storage mode, depending on the configured {@link TransactionMode}.
*
* @param dataChunkSize the number of records to include in each data chunk for parallel
* processing
Expand Down Expand Up @@ -375,7 +375,7 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR

/**
* Processes a complete data chunk using parallel execution. The processing mode (transactional or
* storage) is determined by the configured {@link ScalarDbMode}.
* storage) is determined by the configured {@link TransactionMode}.
*
* @param dataChunk the data chunk to process
* @param transactionBatchSize the size of transaction batches (used only in transaction mode)
Expand All @@ -389,7 +389,7 @@ private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSiz
.build();
notifyDataChunkStarted(status);
ImportDataChunkStatus importDataChunkStatus;
if (params.getScalarDbMode() == ScalarDbMode.TRANSACTION) {
if (params.getScalarDbMode() == TransactionMode.CONSENSUS_COMMIT) {
importDataChunkStatus = processDataChunkWithTransactions(dataChunk, transactionBatchSize);
} else {
importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import java.util.Map;
Expand All @@ -21,7 +21,7 @@
@Value
public class ImportProcessorParams {
/** The operational mode of ScalarDB (transaction or storage mode). */
ScalarDbMode scalarDbMode;
TransactionMode scalarDbMode;

/** Configuration options for the import operation. */
ImportOptions importOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorParams;
import com.scalar.db.transaction.singlecrudoperation.SingleCrudOperationTransactionManager;
import java.io.BufferedReader;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

public class ImportManagerTest {

Expand All @@ -37,12 +43,7 @@ void setUp() {

importManager =
new ImportManager(
tableMetadata,
reader,
options,
processorFactory,
ScalarDbMode.STORAGE,
distributedTransactionManager);
tableMetadata, reader, options, processorFactory, distributedTransactionManager);
importManager.addListener(listener1);
importManager.addListener(listener2);
}
Expand Down Expand Up @@ -76,7 +77,6 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() {
mock(BufferedReader.class),
mock(ImportOptions.class),
mock(ImportProcessorFactory.class),
ScalarDbMode.TRANSACTION,
distributedTransactionManager);

managerWithTx.closeResources();
Expand All @@ -93,4 +93,66 @@ void closeResources_shouldThrowIfResourceCloseFails() {
assertEquals("Failed to close the resource", ex.getMessage());
assertEquals("Close failed", ex.getCause().getMessage());
}

@Test
void startImport_shouldUseSingleCrudMode_whenTransactionManagerIsSingleCrud() throws Exception {
// Arrange
Map<String, TableMetadata> tableMetadata = new HashMap<>();
BufferedReader reader = mock(BufferedReader.class);
ImportOptions options = mock(ImportOptions.class);
when(options.getDataChunkSize()).thenReturn(100);
when(options.getTransactionBatchSize()).thenReturn(10);

ImportProcessorFactory processorFactory = mock(ImportProcessorFactory.class);
ImportProcessor processor = mock(ImportProcessor.class);
when(processorFactory.createImportProcessor(any())).thenReturn(processor);

SingleCrudOperationTransactionManager singleCrudTxManager =
mock(SingleCrudOperationTransactionManager.class);

ImportManager manager =
new ImportManager(tableMetadata, reader, options, processorFactory, singleCrudTxManager);

// Act
manager.startImport();

// Assert
ArgumentCaptor<ImportProcessorParams> paramsCaptor =
ArgumentCaptor.forClass(ImportProcessorParams.class);
verify(processorFactory).createImportProcessor(paramsCaptor.capture());

ImportProcessorParams capturedParams = paramsCaptor.getValue();
assertEquals(TransactionMode.SINGLE_CRUD, capturedParams.getScalarDbMode());
}

@Test
void startImport_shouldUseConsensusCommitMode_whenTransactionManagerIsNotSingleCrud()
throws Exception {
// Arrange
Map<String, TableMetadata> tableMetadata = new HashMap<>();
BufferedReader reader = mock(BufferedReader.class);
ImportOptions options = mock(ImportOptions.class);
when(options.getDataChunkSize()).thenReturn(100);
when(options.getTransactionBatchSize()).thenReturn(10);

ImportProcessorFactory processorFactory = mock(ImportProcessorFactory.class);
ImportProcessor processor = mock(ImportProcessor.class);
when(processorFactory.createImportProcessor(any())).thenReturn(processor);

DistributedTransactionManager regularTxManager = mock(DistributedTransactionManager.class);

ImportManager manager =
new ImportManager(tableMetadata, reader, options, processorFactory, regularTxManager);

// Act
manager.startImport();

// Assert
ArgumentCaptor<ImportProcessorParams> paramsCaptor =
ArgumentCaptor.forClass(ImportProcessorParams.class);
verify(processorFactory).createImportProcessor(paramsCaptor.capture());

ImportProcessorParams capturedParams = paramsCaptor.getValue();
assertEquals(TransactionMode.CONSENSUS_COMMIT, capturedParams.getScalarDbMode());
}
}
Loading
Loading