Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
627fe69
initial commit
inv-jishnu Nov 10, 2025
32b4b70
Initial changes
inv-jishnu Nov 10, 2025
1e08ecf
scanner class updated
inv-jishnu Nov 11, 2025
e87b94c
Merge branch 'master' into feat/data-loader/changes-export
inv-jishnu Nov 11, 2025
e7c68a7
Feedback changes
inv-jishnu Nov 11, 2025
b973df8
Feedback changes javadoc and param rename
inv-jishnu Nov 11, 2025
186d3c3
Replace storage with transaction in table metadata service
inv-jishnu Nov 11, 2025
1d81db7
Initial commit
inv-jishnu Nov 11, 2025
36ff32f
transaction mode use fixed
inv-jishnu Nov 12, 2025
5b6f86a
Removed unused code and udpated java doc comments
inv-jishnu Nov 12, 2025
1f0644e
Add changes based on feedback
inv-jishnu Nov 12, 2025
326f7a7
Added changes based on feedback
inv-jishnu Nov 12, 2025
e8f9feb
Removed usunsed params
inv-jishnu Nov 13, 2025
98e84bb
correction
inv-jishnu Nov 13, 2025
7502fe8
Merge branch 'feat/data-loader/table-metadata-replace-storage' into f…
inv-jishnu Nov 13, 2025
0ccc67b
Merge branch 'master' into feat/data-loader/import-replace-storage
inv-jishnu Nov 13, 2025
9589094
Resolved conflicts and merged latest changes from master branch
inv-jishnu Nov 14, 2025
671f95a
Removed unused commit
inv-jishnu Nov 14, 2025
6418f5c
Fixed merge import issues
inv-jishnu Nov 14, 2025
a5284be
Merge branch 'feat/data-loader/table-metadata-replace-storage' into f…
inv-jishnu Nov 14, 2025
96ba8f0
Deprecate `include-metadata` option for data loader export (#3159)
ypeckstadt Nov 14, 2025
d2eebbf
Merge branch 'master' into feat/data-loader/import-replace-storage
inv-jishnu Nov 14, 2025
1b100b8
Fix build failure
inv-jishnu Nov 14, 2025
b9fa18d
Merge branch 'master' into feat/data-loader/import-replace-storage
inv-jishnu Nov 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
Expand All @@ -28,7 +28,7 @@
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedWriter;
import java.nio.charset.Charset;
import java.nio.file.Files;
Expand Down Expand Up @@ -63,12 +63,13 @@ public Integer call() throws Exception {
spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE);
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
new TableMetadataService(transactionFactory.getTransactionAdmin());
ScalarDbDao scalarDbDao = new ScalarDbDao();

ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
ExportManager exportManager =
createExportManager(transactionFactory, scalarDbDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);

Expand Down Expand Up @@ -146,17 +147,17 @@ private void validateOutputDirectory() throws DirectoryValidationException {
}

private ExportManager createExportManager(
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
DistributedStorage storage = storageFactory.getStorage();
DistributedTransactionManager manager = transactionFactory.getTransactionManager();
switch (fileFormat) {
case JSON:
return new JsonExportManager(storage, scalarDbDao, taskFactory);
return new JsonExportManager(manager, scalarDbDao, taskFactory);
case JSONL:
return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
return new JsonLineExportManager(manager, scalarDbDao, taskFactory);
case CSV:
return new CsvExportManager(storage, scalarDbDao, taskFactory);
return new CsvExportManager(manager, scalarDbDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
import static com.scalar.db.dataloader.cli.util.CommandLineInputUtils.validatePositiveValue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.scalar.db.api.DistributedStorageAdmin;
import com.scalar.db.api.DistributedTransactionAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportManager;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager;
import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger;
Expand All @@ -26,7 +23,6 @@
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedReader;
import java.io.File;
Expand Down Expand Up @@ -107,9 +103,9 @@ private Map<String, TableMetadata> createTableMetadataMap(
ControlFile controlFile, String namespace, String tableName)
throws IOException, TableMetadataException {
File configFile = new File(configFilePath);
StorageFactory storageFactory = StorageFactory.create(configFile);
try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
TransactionFactory transactionFactory = TransactionFactory.create(configFile);
try (DistributedTransactionAdmin transactionAdmin = transactionFactory.getTransactionAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(transactionAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
if (controlFile != null) {
for (ControlFileTable table : controlFile.getTables()) {
Expand Down Expand Up @@ -145,32 +141,14 @@ private ImportManager createImportManager(
throws IOException {
File configFile = new File(configFilePath);
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
ImportManager importManager;
if (scalarDbMode == ScalarDbMode.TRANSACTION) {
ScalarDbTransactionManager scalarDbTransactionManager =
new ScalarDbTransactionManager(TransactionFactory.create(configFile));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.TRANSACTION,
null,
scalarDbTransactionManager.getDistributedTransactionManager());
} else {
ScalarDbStorageManager scalarDbStorageManager =
new ScalarDbStorageManager(StorageFactory.create(configFile));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.STORAGE,
scalarDbStorageManager.getDistributedStorage(),
null);
}
ImportManager importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
scalarDbMode,
TransactionFactory.create(configFile).getTransactionManager());
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -15,16 +15,23 @@
public class CsvExportManager extends ExportManager {

/**
* Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link
* ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code CsvExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
* from ScalarDB within a distributed transaction context.
*
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
* mode
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
* exporting data
*/
public CsvExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager manager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(manager, dao, producerTaskFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.api.TransactionManagerCrudOperable;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
Expand All @@ -12,6 +12,8 @@
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.io.DataType;
import java.io.BufferedWriter;
import java.io.IOException;
Expand All @@ -34,7 +36,7 @@
public abstract class ExportManager {
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);

private final DistributedStorage storage;
private final DistributedTransactionManager manager;
private final ScalarDbDao dao;
private final ProducerTaskFactory producerTaskFactory;
private final Object lock = new Object();
Expand Down Expand Up @@ -86,7 +88,8 @@ public ExportReport startExport(
BufferedWriter bufferedWriter = new BufferedWriter(writer);
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;

try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
try (TransactionManagerCrudOperable.Scanner scanner =
createScanner(exportOptions, dao, manager)) {

Iterator<Result> iterator = scanner.iterator();
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
Expand All @@ -113,8 +116,11 @@ public ExportReport startExport(
// TODO: handle this
}
processFooter(exportOptions, tableMetadata, bufferedWriter);
} catch (InterruptedException | IOException e) {
logger.error("Error during export: {}", e.getMessage());
} catch (InterruptedException
| IOException
| UnknownTransactionStatusException
| CrudException e) {
logger.error("Error during export: ", e);
} finally {
bufferedWriter.flush();
}
Expand Down Expand Up @@ -216,12 +222,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
*
* @param exportOptions export options
* @param dao ScalarDB dao object
* @param storage distributed storage object
* @param manager DistributedTransactionManager object
* @return created scanner
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
*/
private Scanner createScanner(
ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
private TransactionManagerCrudOperable.Scanner createScanner(
ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager)
throws ScalarDbDaoException {
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
if (isScanAll) {
Expand All @@ -230,7 +236,7 @@ private Scanner createScanner(
exportOptions.getTableName(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
storage);
manager);
} else {
return dao.createScanner(
exportOptions.getNamespace(),
Expand All @@ -240,7 +246,7 @@ private Scanner createScanner(
exportOptions.getSortOrders(),
exportOptions.getProjectionColumns(),
exportOptions.getLimit(),
storage);
manager);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -11,16 +11,23 @@
public class JsonExportManager extends ExportManager {

/**
* Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link
* ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code JsonExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
* from ScalarDB within a distributed transaction context and exported in JSON format.
*
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
* mode
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
* exporting data
*/
public JsonExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager manager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(manager, dao, producerTaskFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -13,16 +13,23 @@
public class JsonLineExportManager extends ExportManager {

/**
* Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage},
* {@link ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code JsonLineExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
* from ScalarDB within a distributed transaction context and exported in JSON Lines format.
*
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
* mode
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
* exporting data
*/
public JsonLineExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager manager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(manager, dao, producerTaskFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.ScalarDbMode;
Expand Down Expand Up @@ -44,7 +43,6 @@ public class ImportManager implements ImportEventListener {
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDbMode scalarDbMode;
private final DistributedStorage distributedStorage;
private final DistributedTransactionManager distributedTransactionManager;

/**
Expand All @@ -62,7 +60,6 @@ public void startImport() {
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDbDao())
.distributedTransactionManager(distributedTransactionManager)
.distributedStorage(distributedStorage)
.tableColumnDataTypes(getTableColumnDataTypes())
.build();
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
Expand Down Expand Up @@ -169,9 +166,7 @@ public void onAllDataChunksCompleted() {
/** Close resources properly once the process is completed */
public void closeResources() {
try {
if (distributedStorage != null) {
distributedStorage.close();
} else if (distributedTransactionManager != null) {
if (distributedTransactionManager != null) {
distributedTransactionManager.close();
}
} catch (Throwable e) {
Expand Down
Loading
Loading