-
Notifications
You must be signed in to change notification settings - Fork 41
Add import command for data loader CLI #2618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
be9ee23
89b9f05
49c83b6
b2871fb
c5c9c0a
4964e8d
ff81f5f
3934c2a
9958f95
1afbc21
8c5114d
ffab395
79df1ed
cf31672
6dd213e
6542177
603e46e
eaf9d88
0a2518a
378effb
114fd55
b20c878
9266997
64ee913
16aa0a8
2940fcc
b34e266
92f52c6
f064516
8550bd2
afc1ae9
a5a8d57
c696b30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,14 +1,304 @@ | ||||||||||
| package com.scalar.db.dataloader.cli.command.dataimport; | ||||||||||
|
|
||||||||||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||||||||||
| import com.scalar.db.api.TableMetadata; | ||||||||||
| import com.scalar.db.common.error.CoreError; | ||||||||||
| import com.scalar.db.dataloader.core.FileFormat; | ||||||||||
| import com.scalar.db.dataloader.core.ScalarDbMode; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.ImportManager; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.ImportOptions; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.log.LogMode; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.log.SplitByDataChunkImportLogger; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.processor.DefaultImportProcessorFactory; | ||||||||||
| import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory; | ||||||||||
| import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException; | ||||||||||
| import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; | ||||||||||
| import com.scalar.db.dataloader.core.util.TableMetadataUtil; | ||||||||||
| import com.scalar.db.service.StorageFactory; | ||||||||||
| import com.scalar.db.service.TransactionFactory; | ||||||||||
| import java.io.BufferedReader; | ||||||||||
| import java.io.File; | ||||||||||
| import java.io.IOException; | ||||||||||
| import java.nio.charset.Charset; | ||||||||||
| import java.nio.file.Files; | ||||||||||
| import java.nio.file.Path; | ||||||||||
| import java.nio.file.Paths; | ||||||||||
| import java.util.HashMap; | ||||||||||
| import java.util.Map; | ||||||||||
| import java.util.Optional; | ||||||||||
| import java.util.concurrent.Callable; | ||||||||||
| import org.apache.commons.lang3.StringUtils; | ||||||||||
| import picocli.CommandLine; | ||||||||||
| import picocli.CommandLine.Model.CommandSpec; | ||||||||||
| import picocli.CommandLine.ParameterException; | ||||||||||
| import picocli.CommandLine.Spec; | ||||||||||
|
|
||||||||||
| @CommandLine.Command(name = "import", description = "Import data into a ScalarDB table") | ||||||||||
| public class ImportCommand extends ImportCommandOptions implements Callable<Integer> { | ||||||||||
| @CommandLine.Spec CommandLine.Model.CommandSpec spec; | ||||||||||
|
|
||||||||||
| /** Spec injected by PicoCli */ | ||||||||||
| @Spec CommandSpec spec; | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public Integer call() throws Exception { | ||||||||||
| validateImportTarget(controlFilePath, namespace, tableName); | ||||||||||
| validateLogDirectory(logDirectory); | ||||||||||
| ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null); | ||||||||||
| ImportOptions importOptions = createImportOptions(controlFile); | ||||||||||
| ImportLoggerConfig config = | ||||||||||
| ImportLoggerConfig.builder() | ||||||||||
| .logDirectoryPath(logDirectory) | ||||||||||
| .isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord()) | ||||||||||
| .isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords()) | ||||||||||
| .prettyPrint(prettyPrint) | ||||||||||
| .build(); | ||||||||||
| LogWriterFactory logWriterFactory = createLogWriterFactory(config); | ||||||||||
| Map<String, TableMetadata> tableMetadataMap = | ||||||||||
| createTableMetadataMap(controlFile, namespace, tableName); | ||||||||||
| try (BufferedReader reader = | ||||||||||
| Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) { | ||||||||||
| ImportManager importManager = | ||||||||||
| createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config); | ||||||||||
| importManager.startImport(); | ||||||||||
| } | ||||||||||
| return 0; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Create LogWriterFactory object | ||||||||||
| * | ||||||||||
| * @return LogWriterFactory object | ||||||||||
| */ | ||||||||||
| private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) { | ||||||||||
| return new DefaultLogWriterFactory(config); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Create TableMetadata Map from provided controlfile/ namespace, table name | ||||||||||
| * | ||||||||||
| * @param controlFile control file | ||||||||||
| * @param namespace Namespace | ||||||||||
| * @param tableName Single table name | ||||||||||
| * @return {@code Map<String, TableMetadata>} a table metadata map | ||||||||||
| * @throws ParameterException if one of the argument values is wrong | ||||||||||
| */ | ||||||||||
| private Map<String, TableMetadata> createTableMetadataMap( | ||||||||||
| ControlFile controlFile, String namespace, String tableName) | ||||||||||
| throws IOException, TableMetadataException { | ||||||||||
| File configFile = new File(configFilePath); | ||||||||||
| StorageFactory storageFactory = StorageFactory.create(configFile); | ||||||||||
| TableMetadataService tableMetadataService = | ||||||||||
| new TableMetadataService(storageFactory.getStorageAdmin()); | ||||||||||
|
||||||||||
| Map<String, TableMetadata> tableMetadataMap = new HashMap<>(); | ||||||||||
| if (controlFile != null) { | ||||||||||
| for (ControlFileTable table : controlFile.getTables()) { | ||||||||||
| tableMetadataMap.put( | ||||||||||
| TableMetadataUtil.getTableLookupKey(table.getNamespace(), table.getTable()), | ||||||||||
| tableMetadataService.getTableMetadata(table.getNamespace(), table.getTable())); | ||||||||||
| } | ||||||||||
| } else { | ||||||||||
| tableMetadataMap.put( | ||||||||||
| TableMetadataUtil.getTableLookupKey(namespace, tableName), | ||||||||||
| tableMetadataService.getTableMetadata(namespace, tableName)); | ||||||||||
| } | ||||||||||
| return tableMetadataMap; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Create ImportManager object from data | ||||||||||
| * | ||||||||||
| * @param importOptions import options | ||||||||||
| * @param tableMetadataMap table metadata map | ||||||||||
| * @param reader buffered reader with source data | ||||||||||
| * @param logWriterFactory log writer factory object | ||||||||||
| * @param config import logging config | ||||||||||
| * @return ImportManager object | ||||||||||
| */ | ||||||||||
| private ImportManager createImportManager( | ||||||||||
| ImportOptions importOptions, | ||||||||||
| Map<String, TableMetadata> tableMetadataMap, | ||||||||||
| BufferedReader reader, | ||||||||||
| LogWriterFactory logWriterFactory, | ||||||||||
| ImportLoggerConfig config) | ||||||||||
| throws IOException { | ||||||||||
| File configFile = new File(configFilePath); | ||||||||||
| ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory(); | ||||||||||
| ImportManager importManager; | ||||||||||
| if (scalarDbMode == ScalarDbMode.TRANSACTION) { | ||||||||||
| ScalarDbTransactionManager scalarDbTransactionManager = | ||||||||||
| new ScalarDbTransactionManager(TransactionFactory.create(configFile)); | ||||||||||
| importManager = | ||||||||||
| new ImportManager( | ||||||||||
| tableMetadataMap, | ||||||||||
| reader, | ||||||||||
| importOptions, | ||||||||||
| importProcessorFactory, | ||||||||||
| ScalarDbMode.TRANSACTION, | ||||||||||
| null, | ||||||||||
| scalarDbTransactionManager.getDistributedTransactionManager()); | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this instance be closed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently No. I will created another PR to make necessary changes to confirm that both transaction manager and storage are closed once the process is completed. I will make this change in core part as we can confirm the import process completion in the import manager class. |
||||||||||
| } else { | ||||||||||
| ScalarDbStorageManager scalarDbStorageManager = | ||||||||||
| new ScalarDbStorageManager(StorageFactory.create(configFile)); | ||||||||||
| importManager = | ||||||||||
| new ImportManager( | ||||||||||
| tableMetadataMap, | ||||||||||
| reader, | ||||||||||
| importOptions, | ||||||||||
| importProcessorFactory, | ||||||||||
| ScalarDbMode.STORAGE, | ||||||||||
| scalarDbStorageManager.getDistributedStorage(), | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I will update it as mentioned in the above comment, |
||||||||||
| null); | ||||||||||
| } | ||||||||||
| if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) { | ||||||||||
| importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory)); | ||||||||||
| } else importManager.addListener(new SingleFileImportLogger(config, logWriterFactory)); | ||||||||||
|
||||||||||
| } else importManager.addListener(new SingleFileImportLogger(config, logWriterFactory)); | |
| } else { | |
| importManager.addListener(new SingleFileImportLogger(config, logWriterFactory)); | |
| } |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you wrap this with braces?
Copilot
AI
May 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The splitLogMode flag is only used to set SINGLE_FILE mode; when true, you never set LogMode.SPLIT_BY_DATA_CHUNK. Add an else block to set the split-chunk mode.
| builder.logMode(LogMode.SINGLE_FILE); | |
| builder.logMode(LogMode.SINGLE_FILE); | |
| } else { | |
| builder.logMode(LogMode.SPLIT_BY_DATA_CHUNK); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogMode.SPLIT_BY_DATA_CHUNK is the default value set if no value is set for logMode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should close the
DistributedStorageAdmininstance after use.If there are other parts that need the same fix, could you take care of them in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
I will update the code to close it and will add necessary changes to close other instances in another PR.