Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
be9ee23
Initial commit
inv-jishnu Apr 10, 2025
89b9f05
Spotless applied again
inv-jishnu Apr 11, 2025
49c83b6
Removed unused code
inv-jishnu Apr 11, 2025
b2871fb
Merge branch 'master' into feat/data-loader/import-log-2
ypeckstadt Apr 15, 2025
c5c9c0a
Removed unused classes and references
inv-jishnu Apr 15, 2025
4964e8d
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 15, 2025
ff81f5f
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 15, 2025
3934c2a
Improve Javadocs
ypeckstadt Apr 16, 2025
9958f95
Changes
inv-jishnu Apr 21, 2025
1afbc21
Renamed parameters
inv-jishnu Apr 21, 2025
8c5114d
logging changes
inv-jishnu Apr 21, 2025
ffab395
removed repeated code
inv-jishnu Apr 22, 2025
79df1ed
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 22, 2025
cf31672
Merge branch 'master' into feat/data-loader/import-log-2
brfrn169 Apr 23, 2025
6dd213e
Added excetpion throw
inv-jishnu Apr 23, 2025
6542177
Synchronisation changes
inv-jishnu Apr 25, 2025
603e46e
Added volatile back to fix spotbugs issue
inv-jishnu Apr 25, 2025
eaf9d88
Removed unused variable
inv-jishnu Apr 25, 2025
0a2518a
Changes
inv-jishnu Apr 29, 2025
378effb
Initial commit
inv-jishnu Apr 29, 2025
114fd55
Initial changes
inv-jishnu Apr 29, 2025
b20c878
Merged changes from master after resolving conflicts
inv-jishnu May 22, 2025
9266997
Minor change
inv-jishnu May 22, 2025
64ee913
Removed unused file
inv-jishnu May 22, 2025
16aa0a8
Revert new line change
inv-jishnu May 22, 2025
2940fcc
Merge branch 'master' into feat/data-loader/cli-import-2
ypeckstadt May 23, 2025
b34e266
Changes
inv-jishnu May 25, 2025
92f52c6
Braces adde
inv-jishnu May 26, 2025
f064516
Storage admin close added to table metadata retrieval
inv-jishnu May 26, 2025
8550bd2
Merge branch 'master' into feat/data-loader/cli-import-2
inv-jishnu May 26, 2025
afc1ae9
Removed file not required in this PR
inv-jishnu May 27, 2025
a5a8d57
Merge branch 'master' into feat/data-loader/cli-import-2
inv-jishnu May 27, 2025
c696b30
Updated header command from h to hdr
inv-jishnu May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 295 additions & 1 deletion ...ader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,14 +1,308 @@
package com.scalar.db.dataloader.cli.command.dataimport;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.scalar.db.api.DistributedStorageAdmin;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.dataimport.ImportManager;
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager;
import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger;
import com.scalar.db.dataloader.core.dataimport.log.SplitByDataChunkImportLogger;
import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
import com.scalar.db.dataloader.core.dataimport.processor.DefaultImportProcessorFactory;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.StringUtils;
import picocli.CommandLine;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.ParameterException;
import picocli.CommandLine.Spec;

@CommandLine.Command(name = "import", description = "Import data into a ScalarDB table")
public class ImportCommand extends ImportCommandOptions implements Callable<Integer> {
@CommandLine.Spec CommandLine.Model.CommandSpec spec;

/** Spec injected by PicoCli */
@Spec CommandSpec spec;

@Override
public Integer call() throws Exception {
validateImportTarget(controlFilePath, namespace, tableName);
validateLogDirectory(logDirectory);
ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null);
ImportOptions importOptions = createImportOptions(controlFile);
ImportLoggerConfig config =
ImportLoggerConfig.builder()
.logDirectoryPath(logDirectory)
.isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord())
.isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords())
.prettyPrint(prettyPrint)
.build();
LogWriterFactory logWriterFactory = createLogWriterFactory(config);
Map<String, TableMetadata> tableMetadataMap =
createTableMetadataMap(controlFile, namespace, tableName);
try (BufferedReader reader =
Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) {
ImportManager importManager =
createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config);
importManager.startImport();
}
return 0;
}

/**
* Create LogWriterFactory object
*
* @return LogWriterFactory object
*/
private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
return new DefaultLogWriterFactory(config);
}

/**
* Create TableMetadata Map from provided controlfile/ namespace, table name
*
* @param controlFile control file
* @param namespace Namespace
* @param tableName Single table name
* @return {@code Map<String, TableMetadata>} a table metadata map
* @throws ParameterException if one of the argument values is wrong
*/
private Map<String, TableMetadata> createTableMetadataMap(
ControlFile controlFile, String namespace, String tableName)
throws IOException, TableMetadataException {
File configFile = new File(configFilePath);
StorageFactory storageFactory = StorageFactory.create(configFile);
try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
if (controlFile != null) {
for (ControlFileTable table : controlFile.getTables()) {
tableMetadataMap.put(
TableMetadataUtil.getTableLookupKey(table.getNamespace(), table.getTable()),
tableMetadataService.getTableMetadata(table.getNamespace(), table.getTable()));
}
} else {
tableMetadataMap.put(
TableMetadataUtil.getTableLookupKey(namespace, tableName),
tableMetadataService.getTableMetadata(namespace, tableName));
}
return tableMetadataMap;
}
}

/**
* Create ImportManager object from data
*
* @param importOptions import options
* @param tableMetadataMap table metadata map
* @param reader buffered reader with source data
* @param logWriterFactory log writer factory object
* @param config import logging config
* @return ImportManager object
*/
private ImportManager createImportManager(
ImportOptions importOptions,
Map<String, TableMetadata> tableMetadataMap,
BufferedReader reader,
LogWriterFactory logWriterFactory,
ImportLoggerConfig config)
throws IOException {
File configFile = new File(configFilePath);
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
ImportManager importManager;
if (scalarDbMode == ScalarDbMode.TRANSACTION) {
ScalarDbTransactionManager scalarDbTransactionManager =
new ScalarDbTransactionManager(TransactionFactory.create(configFile));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.TRANSACTION,
null,
scalarDbTransactionManager.getDistributedTransactionManager());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this instance be closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently No. I will created another PR to make necessary changes to confirm that both transaction manager and storage are closed once the process is completed. I will make this change in core part as we can confirm the import process completion in the import manager class.

} else {
ScalarDbStorageManager scalarDbStorageManager =
new ScalarDbStorageManager(StorageFactory.create(configFile));
importManager =
new ImportManager(
tableMetadataMap,
reader,
importOptions,
importProcessorFactory,
ScalarDbMode.STORAGE,
scalarDbStorageManager.getDistributedStorage(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I will update it as mentioned in the above comment,

null);
}
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
} else {
importManager.addListener(new SingleFileImportLogger(config, logWriterFactory));
}
return importManager;
}

/**
* Validate import targets
*
* @param controlFilePath control file path
* @param namespace Namespace
* @param tableName Single table name
* @throws ParameterException if one of the argument values is wrong
*/
private void validateImportTarget(String controlFilePath, String namespace, String tableName) {
// Throw an error if there was no clear imports target specified
if (StringUtils.isBlank(controlFilePath)
&& (StringUtils.isBlank(namespace) || StringUtils.isBlank(tableName))) {
throw new ParameterException(
spec.commandLine(), CoreError.DATA_LOADER_IMPORT_TARGET_MISSING.buildMessage());
}

// Make sure the control file exists when a path is provided
if (!StringUtils.isBlank(controlFilePath)) {
Path path = Paths.get(controlFilePath);
if (!Files.exists(path)) {
throw new ParameterException(
spec.commandLine(),
CoreError.DATA_LOADER_MISSING_IMPORT_FILE.buildMessage(
controlFilePath, FILE_OPTION_NAME_LONG_FORMAT));
}
}
}

/**
* Validate log directory path
*
* @param logDirectory log directory path
* @throws ParameterException if the path is invalid
*/
private void validateLogDirectory(String logDirectory) throws ParameterException {
Path logDirectoryPath;
if (!StringUtils.isBlank(logDirectory)) {
// User-provided log directory via CLI argument
logDirectoryPath = Paths.get(logDirectory);

if (Files.exists(logDirectoryPath)) {
// Check if the provided directory is writable
if (!Files.isWritable(logDirectoryPath)) {
throw new ParameterException(
spec.commandLine(),
CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage(
logDirectoryPath.toAbsolutePath()));
}
} else {
// Create the log directory if it doesn't exist
try {
Files.createDirectories(logDirectoryPath);
} catch (IOException e) {
throw new ParameterException(
spec.commandLine(),
CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage(
logDirectoryPath.toAbsolutePath()));
}
}
return;
}

// Use the current working directory as the log directory
logDirectoryPath = Paths.get(System.getProperty("user.dir"));

// Check if the current working directory is writable
if (!Files.isWritable(logDirectoryPath)) {
throw new ParameterException(
spec.commandLine(),
CoreError.DATA_LOADER_LOG_DIRECTORY_WRITE_ACCESS_DENIED.buildMessage(
logDirectoryPath.toAbsolutePath()));
}
}

/**
* Generate control file from a valid control file path
*
* @param controlFilePath control directory path
* @return {@code Optional<ControlFile>} generated control file object
* @throws ParameterException if the path is invalid
*/
private Optional<ControlFile> parseControlFileFromPath(String controlFilePath) {
if (StringUtils.isBlank(controlFilePath)) {
return Optional.empty();
}
try {
ObjectMapper objectMapper = new ObjectMapper();
ControlFile controlFile =
objectMapper.readValue(new File(controlFilePath), ControlFile.class);
return Optional.of(controlFile);
} catch (IOException e) {
throw new ParameterException(
spec.commandLine(),
CoreError.DATA_LOADER_INVALID_CONTROL_FILE.buildMessage(controlFilePath));
}
}

/**
* Generate import options object from provided cli parameter data
*
* @param controlFile control file
* @return ImportOptions generated import options object
*/
private ImportOptions createImportOptions(ControlFile controlFile) {
ImportOptions.ImportOptionsBuilder builder =
ImportOptions.builder()
.fileFormat(sourceFileFormat)
.requireAllColumns(requireAllColumns)
.prettyPrint(prettyPrint)
.controlFile(controlFile)
.controlFileValidationLevel(controlFileValidation)
.logRawRecord(logRawRecord)
.logSuccessRecords(logSuccessRecords)
.ignoreNullValues(ignoreNullValues)
.namespace(namespace)
.dataChunkSize(dataChunkSize)
.transactionBatchSize(transactionSize)
.maxThreads(maxThreads)
.dataChunkQueueSize(dataChunkQueueSize)
.tableName(tableName);

// Import mode
if (importMode != null) {
builder.importMode(importMode);
}
if (!splitLogMode) {
builder.logMode(LogMode.SINGLE_FILE);
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The splitLogMode flag is only used to set SINGLE_FILE mode; when true, you never set LogMode.SPLIT_BY_DATA_CHUNK. Add an else block to set the split-chunk mode.

Suggested change
builder.logMode(LogMode.SINGLE_FILE);
builder.logMode(LogMode.SINGLE_FILE);
} else {
builder.logMode(LogMode.SPLIT_BY_DATA_CHUNK);

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogMode.SPLIT_BY_DATA_CHUNK is the default value set if no value is set for logMode.

}

// CSV options
if (sourceFileFormat.equals(FileFormat.CSV)) {
builder.delimiter(delimiter);
if (!StringUtils.isBlank(customHeaderRow)) {
builder.customHeaderRow(customHeaderRow);
}
}
return builder.build();
}
}
Loading
Loading