Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package com.scalar.db.dataloader.core;
package com.scalar.db.dataloader.cli;

/**
* The available modes a ScalarDB instance can run in. Determines how ScalarDB interacts with the
* underlying database.
*
* @deprecated As of release 3.17.0. Will be removed in release 4.0.0. This enum is maintained for
* CLI backward compatibility only. Internally, values are converted to {@code TransactionMode}
* in the core module.
*/
@Deprecated
public enum ScalarDbMode {

/**
* Storage mode: Operates directly on the underlying storage engine without transactional
* guarantees. Suitable for raw data access and simple CRUD operations.
*
* @deprecated As of release 3.17.0. Will be removed in release 4.0.0.
*/
@Deprecated
STORAGE,

/**
* Transaction mode: Provides transaction management with ACID guarantees across multiple
* operations. Suitable for applications that require consistency and atomicity.
*
* @deprecated As of release 3.17.0 Will be removed in release 4.0.0.
*/
@Deprecated
TRANSACTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.ScalarDbMode;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.ImportManager;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
Expand Down Expand Up @@ -43,6 +44,7 @@
import picocli.CommandLine.ParameterException;
import picocli.CommandLine.Spec;

@SuppressWarnings("deprecation")
@CommandLine.Command(name = "import", description = "Import data into a ScalarDB table")
public class ImportCommand extends ImportCommandOptions implements Callable<Integer> {

Expand Down Expand Up @@ -143,6 +145,23 @@ private Map<String, TableMetadata> createTableMetadataMap(
}
}

/**
* Converts CLI ScalarDbMode to core TransactionMode.
*
* @param scalarDbMode the ScalarDB mode from CLI
* @return the corresponding TransactionMode for core
*/
private TransactionMode convertToTransactionMode(ScalarDbMode scalarDbMode) {
switch (scalarDbMode) {
case STORAGE:
return TransactionMode.SINGLE_CRUD;
case TRANSACTION:
return TransactionMode.CONSENSUS_COMMIT;
default:
throw new IllegalArgumentException("Unknown ScalarDbMode: " + scalarDbMode);
}
}

/**
* Create ImportManager object from data
*
Expand All @@ -163,13 +182,14 @@ private ImportManager createImportManager(
TransactionFactory transactionFactory)
throws IOException {
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
TransactionMode transactionMode = convertToTransactionMode(scalarDbMode);
ImportManager importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
scalarDbMode,
transactionMode,
transactionFactory.getTransactionManager());
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.scalar.db.dataloader.cli.command.dataimport;

import com.scalar.db.dataloader.cli.ScalarDbMode;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportMode;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
import picocli.CommandLine;

@SuppressWarnings("deprecation")
public class ImportCommandOptions {

public static final String FILE_OPTION_NAME_LONG_FORMAT = "--file";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import static org.mockito.Mockito.when;

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.dataloader.cli.ScalarDbMode;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportMode;
import com.scalar.db.service.TransactionFactory;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.scalar.db.dataloader.core;

/**
* Defines the transaction mode for data loader operations. This determines how data operations are
* executed within ScalarDB.
*/
public enum TransactionMode {

/**
* Single CRUD mode: Executes each operation independently without transactional context. Each
* operation is atomic but there are no guarantees across multiple operations.
*/
SINGLE_CRUD,

/**
* Consensus commit mode: Groups multiple operations into distributed transactions with ACID
* guarantees. Ensures atomicity and consistency across multiple operations through consensus
* commit protocol.
*/
CONSENSUS_COMMIT
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
Expand Down Expand Up @@ -42,7 +42,7 @@ public class ImportManager implements ImportEventListener {
@NonNull private final ImportOptions importOptions;
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDbMode scalarDbMode;
private final TransactionMode transactionMode;
private final DistributedTransactionManager distributedTransactionManager;

/**
Expand All @@ -55,7 +55,7 @@ public class ImportManager implements ImportEventListener {
public void startImport() {
ImportProcessorParams params =
ImportProcessorParams.builder()
.scalarDbMode(scalarDbMode)
.transactionMode(transactionMode)
.importOptions(importOptions)
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDbDao())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
Expand Down Expand Up @@ -55,12 +55,12 @@ public abstract class ImportProcessor {
*
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
* batches transactions according to the specified sizes. The processing can be done in either
* transactional or storage mode, depending on the configured {@link ScalarDbMode}.
* single CRUD or consensus commit mode, depending on the configured {@link TransactionMode}.
*
* @param dataChunkSize the number of records to include in each data chunk for parallel
* processing
* @param transactionBatchSize the number of records to group together in a single transaction
* (only used in transaction mode)
* (only used in consensus commit mode)
* @param reader the {@link BufferedReader} used to read the source file
*/
public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
Expand Down Expand Up @@ -169,11 +169,11 @@ public void removeListener(ImportEventListener listener) {
}

/**
* Notify once the task is completed
* Notify once a single CRUD task is completed
*
* @param result task result object
*/
protected void notifyStorageRecordCompleted(ImportTaskResult result) {
protected void notifySingleCrudRecordCompleted(ImportTaskResult result) {
// Add data to summary, success logs with/without raw data
for (ImportEventListener listener : listeners) {
listener.onTaskComplete(result);
Expand Down Expand Up @@ -366,14 +366,14 @@ private void abortTransactionSafely(@Nullable DistributedTransaction transaction
}

/**
* Processes a single record in storage mode (non-transactional). Each record is processed
* Processes a single record in single CRUD mode (non-transactional). Each record is processed
* independently without transaction guarantees.
*
* @param dataChunkId the parent data chunk id of the chunk containing this record
* @param importRow the record to process
* @return an {@link ImportTaskResult} containing the processing result for the record
*/
private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importRow) {
private ImportTaskResult processSingleCrudRecord(int dataChunkId, ImportRow importRow) {
ImportTaskParams taskParams =
ImportTaskParams.builder()
.sourceRecord(importRow.getSourceData())
Expand All @@ -394,16 +394,17 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR
.targets(importRecordResult.getTargets())
.dataChunkId(dataChunkId)
.build();
notifyStorageRecordCompleted(modifiedTaskResult);
notifySingleCrudRecordCompleted(modifiedTaskResult);
return modifiedTaskResult;
}

/**
* Processes a complete data chunk using parallel execution. The processing mode (transactional or
* storage) is determined by the configured {@link ScalarDbMode}.
* Processes a complete data chunk using parallel execution. The processing mode (consensus commit
* or single CRUD) is determined by the configured {@link TransactionMode}.
*
* @param dataChunk the data chunk to process
* @param transactionBatchSize the size of transaction batches (used only in transaction mode)
* @param transactionBatchSize the size of transaction batches (used only in consensus commit
* mode)
*/
private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSize) {
ImportDataChunkStatus status =
Expand All @@ -414,23 +415,24 @@ private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSiz
.build();
notifyDataChunkStarted(status);
ImportDataChunkStatus importDataChunkStatus;
if (params.getScalarDbMode() == ScalarDbMode.TRANSACTION) {
importDataChunkStatus = processDataChunkWithTransactions(dataChunk, transactionBatchSize);
if (params.getTransactionMode() == TransactionMode.CONSENSUS_COMMIT) {
importDataChunkStatus =
processDataChunkInConsensusCommitMode(dataChunk, transactionBatchSize);
} else {
importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk);
importDataChunkStatus = processDataChunkInSingleCrudMode(dataChunk);
}
notifyDataChunkCompleted(importDataChunkStatus);
}

/**
* Processes a data chunk using transaction mode with parallel batch processing. Multiple
* Processes a data chunk using consensus commit mode with parallel batch processing. Multiple
* transaction batches are processed concurrently using a thread pool.
*
* @param dataChunk the data chunk to process
* @param transactionBatchSize the number of records per transaction batch
* @return an {@link ImportDataChunkStatus} containing processing results and metrics
*/
private ImportDataChunkStatus processDataChunkWithTransactions(
private ImportDataChunkStatus processDataChunkInConsensusCommitMode(
ImportDataChunk dataChunk, int transactionBatchSize) {
Instant startTime = Instant.now();
List<ImportTransactionBatch> transactionBatches =
Expand Down Expand Up @@ -464,19 +466,19 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
}

/**
* Processes a data chunk using storage mode with parallel record processing. Individual records
* are processed concurrently without transaction guarantees.
* Processes a data chunk using single CRUD mode with parallel record processing. Individual
* records are processed concurrently without transaction guarantees.
*
* @param dataChunk the data chunk to process
* @return an {@link ImportDataChunkStatus} containing processing results and metrics
*/
private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChunk dataChunk) {
private ImportDataChunkStatus processDataChunkInSingleCrudMode(ImportDataChunk dataChunk) {
Instant startTime = Instant.now();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);

for (ImportRow importRow : dataChunk.getSourceData()) {
ImportTaskResult result = processStorageRecord(dataChunk.getDataChunkId(), importRow);
ImportTaskResult result = processSingleCrudRecord(dataChunk.getDataChunkId(), importRow);
boolean allSaved =
result.getTargets().stream()
.allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import java.util.Map;
Expand All @@ -20,8 +20,8 @@
@Builder
@Value
public class ImportProcessorParams {
/** The operational mode of ScalarDB (transaction or storage mode). */
ScalarDbMode scalarDbMode;
/** The transaction mode for data operations. */
TransactionMode transactionMode;

/** Configuration options for the import operation. */
ImportOptions importOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import java.io.BufferedReader;
import java.util.HashMap;
Expand Down Expand Up @@ -41,7 +41,7 @@ void setUp() {
reader,
options,
processorFactory,
ScalarDbMode.STORAGE,
TransactionMode.SINGLE_CRUD,
distributedTransactionManager);
importManager.addListener(listener1);
importManager.addListener(listener2);
Expand Down Expand Up @@ -76,7 +76,7 @@ void closeResources_shouldCloseTransactionManagerIfStorageIsNull() {
mock(BufferedReader.class),
mock(ImportOptions.class),
mock(ImportProcessorFactory.class),
ScalarDbMode.TRANSACTION,
TransactionMode.CONSENSUS_COMMIT,
distributedTransactionManager);

managerWithTx.closeResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.TransactionMode;
import com.scalar.db.dataloader.core.UnitTestUtils;
import com.scalar.db.dataloader.core.dataimport.ImportMode;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
Expand Down Expand Up @@ -76,10 +76,10 @@ void setup() throws ScalarDbDaoException, TransactionException {
}

@Test
void test_importProcessWithStorage() {
void test_importProcessWithSingleCrud() {
params =
ImportProcessorParams.builder()
.scalarDbMode(ScalarDbMode.STORAGE)
.transactionMode(TransactionMode.SINGLE_CRUD)
.importOptions(importOptions)
.dao(dao)
.distributedTransactionManager(distributedTransactionManager)
Expand All @@ -94,10 +94,10 @@ void test_importProcessWithStorage() {
}

@Test
void test_importProcessWithTransaction() {
void test_importProcessWithConsensusCommit() {
params =
ImportProcessorParams.builder()
.scalarDbMode(ScalarDbMode.TRANSACTION)
.transactionMode(TransactionMode.CONSENSUS_COMMIT)
.importOptions(importOptions)
.dao(dao)
.distributedTransactionManager(distributedTransactionManager)
Expand Down
Loading