Skip to content

Commit 16fd8b0

Browse files
authored
Merge branch 'master' into add-isConsistentVirtualTableRead-to-StorageInfo
2 parents 2747303 + c34c508 commit 16fd8b0

File tree

18 files changed

+180
-59
lines changed

18 files changed

+180
-59
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,31 @@
1-
package com.scalar.db.dataloader.core;
1+
package com.scalar.db.dataloader.cli;
22

33
/**
44
* The available modes a ScalarDB instance can run in. Determines how ScalarDB interacts with the
55
* underlying database.
6+
*
7+
* @deprecated As of release 3.17.0. Will be removed in release 4.0.0. This enum is maintained for
8+
* CLI backward compatibility only. Internally, values are converted to {@code TransactionMode}
9+
* in the core module.
610
*/
11+
@Deprecated
712
public enum ScalarDbMode {
813

914
/**
1015
* Storage mode: Operates directly on the underlying storage engine without transactional
1116
* guarantees. Suitable for raw data access and simple CRUD operations.
17+
*
18+
* @deprecated As of release 3.17.0. Will be removed in release 4.0.0.
1219
*/
20+
@Deprecated
1321
STORAGE,
1422

1523
/**
1624
* Transaction mode: Provides transaction management with ACID guarantees across multiple
1725
* operations. Suitable for applications that require consistency and atomicity.
26+
*
27+
* @deprecated As of release 3.17.0 Will be removed in release 4.0.0.
1828
*/
29+
@Deprecated
1930
TRANSACTION
2031
}

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import com.scalar.db.api.DistributedTransaction;
88
import com.scalar.db.api.DistributedTransactionAdmin;
99
import com.scalar.db.api.TableMetadata;
10+
import com.scalar.db.dataloader.cli.ScalarDbMode;
1011
import com.scalar.db.dataloader.core.DataLoaderError;
1112
import com.scalar.db.dataloader.core.FileFormat;
12-
import com.scalar.db.dataloader.core.ScalarDbMode;
13+
import com.scalar.db.dataloader.core.TransactionMode;
1314
import com.scalar.db.dataloader.core.dataimport.ImportManager;
1415
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
1516
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
@@ -43,6 +44,7 @@
4344
import picocli.CommandLine.ParameterException;
4445
import picocli.CommandLine.Spec;
4546

47+
@SuppressWarnings("deprecation")
4648
@CommandLine.Command(name = "import", description = "Import data into a ScalarDB table")
4749
public class ImportCommand extends ImportCommandOptions implements Callable<Integer> {
4850

@@ -143,6 +145,23 @@ private Map<String, TableMetadata> createTableMetadataMap(
143145
}
144146
}
145147

148+
/**
149+
* Converts CLI ScalarDbMode to core TransactionMode.
150+
*
151+
* @param scalarDbMode the ScalarDB mode from CLI
152+
* @return the corresponding TransactionMode for core
153+
*/
154+
private TransactionMode convertToTransactionMode(ScalarDbMode scalarDbMode) {
155+
switch (scalarDbMode) {
156+
case STORAGE:
157+
return TransactionMode.SINGLE_CRUD;
158+
case TRANSACTION:
159+
return TransactionMode.CONSENSUS_COMMIT;
160+
default:
161+
throw new IllegalArgumentException("Unknown ScalarDbMode: " + scalarDbMode);
162+
}
163+
}
164+
146165
/**
147166
* Create ImportManager object from data
148167
*
@@ -163,13 +182,14 @@ private ImportManager createImportManager(
163182
TransactionFactory transactionFactory)
164183
throws IOException {
165184
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
185+
TransactionMode transactionMode = convertToTransactionMode(scalarDbMode);
166186
ImportManager importManager =
167187
new ImportManager(
168188
tableMetadataMap,
169189
reader,
170190
importOptions,
171191
importProcessorFactory,
172-
scalarDbMode,
192+
transactionMode,
173193
transactionFactory.getTransactionManager());
174194
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
175195
importManager.addListener(

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package com.scalar.db.dataloader.cli.command.dataimport;
22

3+
import com.scalar.db.dataloader.cli.ScalarDbMode;
34
import com.scalar.db.dataloader.core.FileFormat;
4-
import com.scalar.db.dataloader.core.ScalarDbMode;
55
import com.scalar.db.dataloader.core.dataimport.ImportMode;
66
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
77
import picocli.CommandLine;
88

9+
@SuppressWarnings("deprecation")
910
public class ImportCommandOptions {
1011

1112
public static final String FILE_OPTION_NAME_LONG_FORMAT = "--file";

data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommandTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import static org.mockito.Mockito.when;
88

99
import com.scalar.db.api.DistributedTransactionManager;
10+
import com.scalar.db.dataloader.cli.ScalarDbMode;
1011
import com.scalar.db.dataloader.core.FileFormat;
11-
import com.scalar.db.dataloader.core.ScalarDbMode;
1212
import com.scalar.db.dataloader.core.dataimport.ImportMode;
1313
import com.scalar.db.service.TransactionFactory;
1414
import java.io.IOException;

data-loader/core/src/main/java/com/scalar/db/dataloader/core/Constants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,12 @@ public class Constants {
1616
*/
1717
public static final String ABORT_TRANSACTION_STATUS =
1818
"Transaction aborted as part of batch transaction aborted";
19+
/**
20+
* Special null value representation used for TEXT data type columns in CSV files.
21+
*
22+
* <p>This value is used to distinguish between an empty string and a null value in CSV exports
23+
* and imports. When exporting, null TEXT values are converted to this string. When importing,
24+
* this string is converted back to null for TEXT columns.
25+
*/
26+
public static final String CSV_TEXT_NULL_VALUE = "\\N";
1927
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.scalar.db.dataloader.core;
2+
3+
/**
4+
* Defines the transaction mode for data loader operations. This determines how data operations are
5+
* executed within ScalarDB.
6+
*/
7+
public enum TransactionMode {
8+
9+
/**
10+
* Single CRUD mode: Executes each operation independently without transactional context. Each
11+
* operation is atomic but there are no guarantees across multiple operations.
12+
*/
13+
SINGLE_CRUD,
14+
15+
/**
16+
* Consensus commit mode: Groups multiple operations into distributed transactions with ACID
17+
* guarantees. Ensures atomicity and consistency across multiple operations through consensus
18+
* commit protocol.
19+
*/
20+
CONSENSUS_COMMIT
21+
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/CsvProducerTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.scalar.db.api.Result;
44
import com.scalar.db.api.TableMetadata;
5+
import com.scalar.db.dataloader.core.Constants;
56
import com.scalar.db.dataloader.core.DataLoaderError;
67
import com.scalar.db.dataloader.core.util.CsvUtil;
78
import com.scalar.db.dataloader.core.util.DecimalUtil;
@@ -118,6 +119,11 @@ private String convertResultToCsv(Result result) {
118119
*/
119120
private String convertToString(Result result, String columnName, DataType dataType) {
120121
if (result.isNull(columnName)) {
122+
// Special null value is added when a column of text data type has null value. This is only
123+
// converted for CSV files
124+
if (dataType.equals(DataType.TEXT)) {
125+
return Constants.CSV_TEXT_NULL_VALUE;
126+
}
121127
return null;
122128
}
123129
String value = "";

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.scalar.db.api.DistributedTransactionManager;
44
import com.scalar.db.api.TableMetadata;
5-
import com.scalar.db.dataloader.core.ScalarDbMode;
5+
import com.scalar.db.dataloader.core.TransactionMode;
66
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
77
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
88
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
@@ -42,7 +42,7 @@ public class ImportManager implements ImportEventListener {
4242
@NonNull private final ImportOptions importOptions;
4343
private final ImportProcessorFactory importProcessorFactory;
4444
private final List<ImportEventListener> listeners = new ArrayList<>();
45-
private final ScalarDbMode scalarDbMode;
45+
private final TransactionMode transactionMode;
4646
private final DistributedTransactionManager distributedTransactionManager;
4747

4848
/**
@@ -55,7 +55,7 @@ public class ImportManager implements ImportEventListener {
5555
public void startImport() {
5656
ImportProcessorParams params =
5757
ImportProcessorParams.builder()
58-
.scalarDbMode(scalarDbMode)
58+
.transactionMode(transactionMode)
5959
.importOptions(importOptions)
6060
.tableMetadataByTableName(tableMetadata)
6161
.dao(new ScalarDbDao())

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.scalar.db.api.DistributedTransaction;
44
import com.scalar.db.dataloader.core.DataLoaderError;
5-
import com.scalar.db.dataloader.core.ScalarDbMode;
5+
import com.scalar.db.dataloader.core.TransactionMode;
66
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
77
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
88
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
@@ -55,12 +55,12 @@ public abstract class ImportProcessor {
5555
*
5656
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
5757
* batches transactions according to the specified sizes. The processing can be done in either
58-
* transactional or storage mode, depending on the configured {@link ScalarDbMode}.
58+
* single CRUD or consensus commit mode, depending on the configured {@link TransactionMode}.
5959
*
6060
* @param dataChunkSize the number of records to include in each data chunk for parallel
6161
* processing
6262
* @param transactionBatchSize the number of records to group together in a single transaction
63-
* (only used in transaction mode)
63+
* (only used in consensus commit mode)
6464
* @param reader the {@link BufferedReader} used to read the source file
6565
*/
6666
public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
@@ -169,11 +169,11 @@ public void removeListener(ImportEventListener listener) {
169169
}
170170

171171
/**
172-
* Notify once the task is completed
172+
* Notify once a single CRUD task is completed
173173
*
174174
* @param result task result object
175175
*/
176-
protected void notifyStorageRecordCompleted(ImportTaskResult result) {
176+
protected void notifySingleCrudRecordCompleted(ImportTaskResult result) {
177177
// Add data to summary, success logs with/without raw data
178178
for (ImportEventListener listener : listeners) {
179179
listener.onTaskComplete(result);
@@ -366,14 +366,14 @@ private void abortTransactionSafely(@Nullable DistributedTransaction transaction
366366
}
367367

368368
/**
369-
* Processes a single record in storage mode (non-transactional). Each record is processed
369+
* Processes a single record in single CRUD mode (non-transactional). Each record is processed
370370
* independently without transaction guarantees.
371371
*
372372
* @param dataChunkId the parent data chunk id of the chunk containing this record
373373
* @param importRow the record to process
374374
* @return an {@link ImportTaskResult} containing the processing result for the record
375375
*/
376-
private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importRow) {
376+
private ImportTaskResult processSingleCrudRecord(int dataChunkId, ImportRow importRow) {
377377
ImportTaskParams taskParams =
378378
ImportTaskParams.builder()
379379
.sourceRecord(importRow.getSourceData())
@@ -394,16 +394,17 @@ private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importR
394394
.targets(importRecordResult.getTargets())
395395
.dataChunkId(dataChunkId)
396396
.build();
397-
notifyStorageRecordCompleted(modifiedTaskResult);
397+
notifySingleCrudRecordCompleted(modifiedTaskResult);
398398
return modifiedTaskResult;
399399
}
400400

401401
/**
402-
* Processes a complete data chunk using parallel execution. The processing mode (transactional or
403-
* storage) is determined by the configured {@link ScalarDbMode}.
402+
* Processes a complete data chunk using parallel execution. The processing mode (consensus commit
403+
* or single CRUD) is determined by the configured {@link TransactionMode}.
404404
*
405405
* @param dataChunk the data chunk to process
406-
* @param transactionBatchSize the size of transaction batches (used only in transaction mode)
406+
* @param transactionBatchSize the size of transaction batches (used only in consensus commit
407+
* mode)
407408
*/
408409
private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSize) {
409410
ImportDataChunkStatus status =
@@ -414,23 +415,24 @@ private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSiz
414415
.build();
415416
notifyDataChunkStarted(status);
416417
ImportDataChunkStatus importDataChunkStatus;
417-
if (params.getScalarDbMode() == ScalarDbMode.TRANSACTION) {
418-
importDataChunkStatus = processDataChunkWithTransactions(dataChunk, transactionBatchSize);
418+
if (params.getTransactionMode() == TransactionMode.CONSENSUS_COMMIT) {
419+
importDataChunkStatus =
420+
processDataChunkInConsensusCommitMode(dataChunk, transactionBatchSize);
419421
} else {
420-
importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk);
422+
importDataChunkStatus = processDataChunkInSingleCrudMode(dataChunk);
421423
}
422424
notifyDataChunkCompleted(importDataChunkStatus);
423425
}
424426

425427
/**
426-
* Processes a data chunk using transaction mode with parallel batch processing. Multiple
428+
* Processes a data chunk using consensus commit mode with parallel batch processing. Multiple
427429
* transaction batches are processed concurrently using a thread pool.
428430
*
429431
* @param dataChunk the data chunk to process
430432
* @param transactionBatchSize the number of records per transaction batch
431433
* @return an {@link ImportDataChunkStatus} containing processing results and metrics
432434
*/
433-
private ImportDataChunkStatus processDataChunkWithTransactions(
435+
private ImportDataChunkStatus processDataChunkInConsensusCommitMode(
434436
ImportDataChunk dataChunk, int transactionBatchSize) {
435437
Instant startTime = Instant.now();
436438
List<ImportTransactionBatch> transactionBatches =
@@ -464,19 +466,19 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
464466
}
465467

466468
/**
467-
* Processes a data chunk using storage mode with parallel record processing. Individual records
468-
* are processed concurrently without transaction guarantees.
469+
* Processes a data chunk using single CRUD mode with parallel record processing. Individual
470+
* records are processed concurrently without transaction guarantees.
469471
*
470472
* @param dataChunk the data chunk to process
471473
* @return an {@link ImportDataChunkStatus} containing processing results and metrics
472474
*/
473-
private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChunk dataChunk) {
475+
private ImportDataChunkStatus processDataChunkInSingleCrudMode(ImportDataChunk dataChunk) {
474476
Instant startTime = Instant.now();
475477
AtomicInteger successCount = new AtomicInteger(0);
476478
AtomicInteger failureCount = new AtomicInteger(0);
477479

478480
for (ImportRow importRow : dataChunk.getSourceData()) {
479-
ImportTaskResult result = processStorageRecord(dataChunk.getDataChunkId(), importRow);
481+
ImportTaskResult result = processSingleCrudRecord(dataChunk.getDataChunkId(), importRow);
480482
boolean allSaved =
481483
result.getTargets().stream()
482484
.allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED));

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.scalar.db.api.DistributedTransactionManager;
44
import com.scalar.db.api.TableMetadata;
5-
import com.scalar.db.dataloader.core.ScalarDbMode;
5+
import com.scalar.db.dataloader.core.TransactionMode;
66
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
77
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
88
import java.util.Map;
@@ -20,8 +20,8 @@
2020
@Builder
2121
@Value
2222
public class ImportProcessorParams {
23-
/** The operational mode of ScalarDB (transaction or storage mode). */
24-
ScalarDbMode scalarDbMode;
23+
/** The transaction mode for data operations. */
24+
TransactionMode transactionMode;
2525

2626
/** Configuration options for the import operation. */
2727
ImportOptions importOptions;

0 commit comments

Comments
 (0)