Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
627fe69
initial commit
inv-jishnu Nov 10, 2025
32b4b70
Initial changes
inv-jishnu Nov 10, 2025
1e08ecf
scanner class updated
inv-jishnu Nov 11, 2025
e87b94c
Merge branch 'master' into feat/data-loader/changes-export
inv-jishnu Nov 11, 2025
e7c68a7
Feedback changes
inv-jishnu Nov 11, 2025
b973df8
Feedback changes javadoc and param rename
inv-jishnu Nov 11, 2025
186d3c3
Replace storage with transaction in table metadata service
inv-jishnu Nov 11, 2025
1d81db7
Initial commit
inv-jishnu Nov 11, 2025
36ff32f
transaction mode use fixed
inv-jishnu Nov 12, 2025
5b6f86a
Removed unused code and udpated java doc comments
inv-jishnu Nov 12, 2025
1f0644e
Add changes based on feedback
inv-jishnu Nov 12, 2025
326f7a7
Added changes based on feedback
inv-jishnu Nov 12, 2025
e8f9feb
Removed usunsed params
inv-jishnu Nov 13, 2025
98e84bb
correction
inv-jishnu Nov 13, 2025
7502fe8
Merge branch 'feat/data-loader/table-metadata-replace-storage' into f…
inv-jishnu Nov 13, 2025
0ccc67b
Merge branch 'master' into feat/data-loader/import-replace-storage
inv-jishnu Nov 13, 2025
9589094
Resolved conflicts and merged latest changes from master branch
inv-jishnu Nov 14, 2025
671f95a
Removed unused commit
inv-jishnu Nov 14, 2025
6418f5c
Fixed merge import issues
inv-jishnu Nov 14, 2025
a5284be
Merge branch 'feat/data-loader/table-metadata-replace-storage' into f…
inv-jishnu Nov 14, 2025
96ba8f0
Deprecate `include-metadata` option for data loader export (#3159)
ypeckstadt Nov 14, 2025
d2eebbf
Merge branch 'master' into feat/data-loader/import-replace-storage
inv-jishnu Nov 14, 2025
1b100b8
Fix build failure
inv-jishnu Nov 14, 2025
b9fa18d
Merge branch 'master' into feat/data-loader/import-replace-storage
inv-jishnu Nov 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
Expand All @@ -28,7 +29,6 @@
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedWriter;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -69,17 +69,17 @@ public Integer call() throws Exception {
maxThreads = Runtime.getRuntime().availableProcessors();
}

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
TableMetadata tableMetadata;
try (DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin()) {
TableMetadataService metaDataService = new TableMetadataService(admin);
tableMetadata = metaDataService.getTableMetadata(namespace, table);
}
ScalarDbDao scalarDbDao = new ScalarDbDao();

ExportManager exportManager =
createExportManager(transactionFactory, scalarDbDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);

Key partitionKey =
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
Key scanStartKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.scalar.db.api.DistributedStorageAdmin;
import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportManager;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager;
import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger;
Expand All @@ -26,7 +23,6 @@
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -112,9 +108,9 @@ private Map<String, TableMetadata> createTableMetadataMap(
ControlFile controlFile, String namespace, String tableName)
throws IOException, TableMetadataException {
File configFile = new File(configFilePath);
StorageFactory storageFactory = StorageFactory.create(configFile);
try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
TransactionFactory transactionFactory = TransactionFactory.create(configFile);
try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
if (controlFile != null) {
for (ControlFileTable table : controlFile.getTables()) {
Expand Down Expand Up @@ -150,32 +146,14 @@ private ImportManager createImportManager(
throws IOException {
File configFile = new File(configFilePath);
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
ImportManager importManager;
if (scalarDbMode == ScalarDbMode.TRANSACTION) {
ScalarDbTransactionManager scalarDbTransactionManager =
new ScalarDbTransactionManager(TransactionFactory.create(configFile));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.TRANSACTION,
null,
scalarDbTransactionManager.getDistributedTransactionManager());
} else {
ScalarDbStorageManager scalarDbStorageManager =
new ScalarDbStorageManager(StorageFactory.create(configFile));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.STORAGE,
scalarDbStorageManager.getDistributedStorage(),
null);
}
ImportManager importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
scalarDbMode,
TransactionFactory.create(configFile).getTransactionManager());
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
Expand Down Expand Up @@ -44,7 +43,6 @@ public class ImportManager implements ImportEventListener {
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDbMode scalarDbMode;
private final DistributedStorage distributedStorage;
private final DistributedTransactionManager distributedTransactionManager;

/**
Expand All @@ -62,7 +60,6 @@ public void startImport() {
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDbDao())
.distributedTransactionManager(distributedTransactionManager)
.distributedStorage(distributedStorage)
.tableColumnDataTypes(getTableColumnDataTypes())
.build();
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
Expand Down Expand Up @@ -169,9 +166,7 @@ public void onAllDataChunksCompleted() {
/** Close resources properly once the process is completed */
public void closeResources() {
try {
if (distributedStorage != null) {
distributedStorage.close();
} else if (distributedTransactionManager != null) {
if (distributedTransactionManager != null) {
distributedTransactionManager.close();
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.scalar.db.dataloader.core.dataimport.dao;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
Expand All @@ -10,15 +9,13 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -29,31 +26,39 @@
public class ScalarDbDao {

/**
* Retrieve record from ScalarDB instance in storage mode
* Retrieves a record from a ScalarDB table using the specified partition and optional clustering
* keys while operating in storage mode through a {@link DistributedTransactionManager}.
*
* @param namespace Namespace name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key for get
* @param storage Distributed storage for ScalarDB connection that is running in storage mode.
* @return Optional get result
* @throws ScalarDbDaoException if something goes wrong while reading the data
* <p>This method creates a {@link Get} operation for the given namespace and table, executes it
* using the provided transaction manager, and returns the result if the record exists.
*
* @param namespace the name of the ScalarDB namespace containing the target table
* @param table the name of the table to retrieve the record from
* @param partitionKey the partition key identifying the record's partition
* @param clusteringKey the optional clustering key identifying a specific record within the
* partition; may be {@code null} if the table does not use clustering keys
* @param manager the {@link DistributedTransactionManager} instance used to perform the get
* operation
* @return an {@link Optional} containing the {@link Result} if the record exists, or an empty
* {@link Optional} if not found
* @throws ScalarDbDaoException if an error occurs while performing the get operation or
* interacting with ScalarDB
*/
public Optional<Result> get(
String namespace,
String table,
Key partitionKey,
Key clusteringKey,
DistributedStorage storage)
DistributedTransactionManager manager)
throws ScalarDbDaoException {

// Retrieving the key data for logging
String loggingKey = keysToString(partitionKey, clusteringKey);

try {
Get get = createGetWith(namespace, table, partitionKey, clusteringKey);
return storage.get(get);
} catch (ExecutionException e) {
return manager.get(get);
} catch (CrudException | UnknownTransactionStatusException e) {
throw new ScalarDbDaoException("error GET " + loggingKey, e);
}
}
Expand Down Expand Up @@ -117,70 +122,42 @@ public void put(
}

/**
* Save record in ScalarDB instance
* Saves a record into a ScalarDB table using the specified partition and optional clustering keys
* through a {@link DistributedTransactionManager}.
*
* @param namespace Namespace name
* @param table Table name
* @param partitionKey Partition key
* @param clusteringKey Optional clustering key
* @param columns List of column values to be inserted or updated
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @throws ScalarDbDaoException if something goes wrong while executing the transaction
* <p>This method constructs a {@link Put} operation with the provided key and column information,
* then executes it using the given transaction manager. The operation inserts a new record or
* updates an existing one if a record with the same primary key already exists.
*
* @param namespace the name of the ScalarDB namespace containing the target table
* @param table the name of the table where the record will be inserted or updated
* @param partitionKey the partition key identifying the record's partition
* @param clusteringKey the optional clustering key identifying a specific record within the
* partition; may be {@code null} if the table does not use clustering keys
* @param columns the list of {@link Column} objects representing the column values to insert or
* update
* @param manager the {@link DistributedTransactionManager} instance used to perform the put
* operation
* @throws ScalarDbDaoException if an error occurs while executing the put operation or
* interacting with ScalarDB
*/
public void put(
String namespace,
String table,
Key partitionKey,
Key clusteringKey,
List<Column<?>> columns,
DistributedStorage storage)
DistributedTransactionManager manager)
throws ScalarDbDaoException {
Put put = createPutWith(namespace, table, partitionKey, clusteringKey, columns);
try {
storage.put(put);
} catch (ExecutionException e) {
manager.put(put);
} catch (CrudException | UnknownTransactionStatusException e) {
throw new ScalarDbDaoException(
DataLoaderError.ERROR_CRUD_EXCEPTION.buildMessage(e.getMessage()), e);
}
}

/**
* Scan a ScalarDB table
*
* @param namespace ScalarDB namespace
* @param table ScalarDB table name
* @param partitionKey Partition key used in ScalarDB scan
* @param range Optional range to set ScalarDB scan start and end values
* @param sorts Optional scan clustering key sorting values
* @param projections List of column projection to use during scan
* @param limit Scan limit value
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
* @return List of ScalarDB scan results
* @throws ScalarDbDaoException if scan fails
*/
public List<Result> scan(
String namespace,
String table,
Key partitionKey,
ScanRange range,
List<Scan.Ordering> sorts,
List<String> projections,
int limit,
DistributedStorage storage)
throws ScalarDbDaoException {
// Create scan
Scan scan = createScan(namespace, table, partitionKey, range, sorts, projections, limit);

// scan data
try {
try (Scanner scanner = storage.scan(scan)) {
return scanner.all();
}
} catch (ExecutionException | IOException e) {
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
}
}

/**
* Scan a ScalarDB table
*
Expand Down

This file was deleted.

Loading
Loading