Skip to content

Commit 1dbbbf4

Browse files
Backport to branch(3.12) : Add import command for data loader CLI (#2706)
Co-authored-by: inv-jishnu <[email protected]>
1 parent c4558f0 commit 1dbbbf4

File tree

3 files changed

+526
-2
lines changed

3 files changed

+526
-2
lines changed
Lines changed: 301 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,314 @@
11
package com.scalar.db.dataloader.cli.command.dataimport;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.scalar.db.api.DistributedStorageAdmin;
5+
import com.scalar.db.api.TableMetadata;
6+
import com.scalar.db.common.error.CoreError;
7+
import com.scalar.db.dataloader.core.FileFormat;
8+
import com.scalar.db.dataloader.core.ScalarDbMode;
9+
import com.scalar.db.dataloader.core.dataimport.ImportManager;
10+
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
11+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
12+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
13+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager;
14+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager;
15+
import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
16+
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
17+
import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger;
18+
import com.scalar.db.dataloader.core.dataimport.log.SplitByDataChunkImportLogger;
19+
import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory;
20+
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
21+
import com.scalar.db.dataloader.core.dataimport.processor.DefaultImportProcessorFactory;
22+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
23+
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
24+
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
25+
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
26+
import com.scalar.db.service.StorageFactory;
27+
import com.scalar.db.service.TransactionFactory;
28+
import java.io.BufferedReader;
29+
import java.io.File;
30+
import java.io.IOException;
31+
import java.nio.charset.Charset;
32+
import java.nio.file.Files;
33+
import java.nio.file.Path;
34+
import java.nio.file.Paths;
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
import java.util.Optional;
338
import java.util.concurrent.Callable;
39+
import org.apache.commons.lang3.StringUtils;
440
import picocli.CommandLine;
41+
import picocli.CommandLine.Model.CommandSpec;
42+
import picocli.CommandLine.ParameterException;
43+
import picocli.CommandLine.Spec;
544

645
@CommandLine.Command(name = "import", description = "Import data into a ScalarDB table")
746
public class ImportCommand extends ImportCommandOptions implements Callable<Integer> {
8-
@CommandLine.Spec CommandLine.Model.CommandSpec spec;
47+
48+
/** Spec injected by PicoCli */
49+
@Spec CommandSpec spec;
950

1051
@Override
1152
public Integer call() throws Exception {
53+
validateImportTarget(controlFilePath, namespace, tableName);
54+
validateLogDirectory(logDirectory);
55+
ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null);
56+
ImportOptions importOptions = createImportOptions(controlFile);
57+
ImportLoggerConfig config =
58+
ImportLoggerConfig.builder()
59+
.logDirectoryPath(logDirectory)
60+
.isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord())
61+
.isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords())
62+
.prettyPrint(prettyPrint)
63+
.build();
64+
LogWriterFactory logWriterFactory = createLogWriterFactory(config);
65+
Map<String, TableMetadata> tableMetadataMap =
66+
createTableMetadataMap(controlFile, namespace, tableName);
67+
try (BufferedReader reader =
68+
Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) {
69+
ImportManager importManager =
70+
createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config);
71+
importManager.startImport();
72+
}
1273
return 0;
1374
}
75+
76+
/**
77+
* Create LogWriterFactory object
78+
*
79+
* @return LogWriterFactory object
80+
*/
81+
private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
82+
return new DefaultLogWriterFactory(config);
83+
}
84+
85+
/**
86+
* Create TableMetadata Map from provided controlfile/ namespace, table name
87+
*
88+
* @param controlFile control file
89+
* @param namespace Namespace
90+
* @param tableName Single table name
91+
* @return {@code Map<String, TableMetadata>} a table metadata map
92+
* @throws ParameterException if one of the argument values is wrong
93+
*/
94+
private Map<String, TableMetadata> createTableMetadataMap(
95+
ControlFile controlFile, String namespace, String tableName)
96+
throws IOException, TableMetadataException {
97+
File configFile = new File(configFilePath);
98+
StorageFactory storageFactory = StorageFactory.create(configFile);
99+
DistributedStorageAdmin storageAdmin = null;
100+
try {
101+
storageAdmin = storageFactory.getStorageAdmin();
102+
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
103+
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
104+
if (controlFile != null) {
105+
for (ControlFileTable table : controlFile.getTables()) {
106+
tableMetadataMap.put(
107+
TableMetadataUtil.getTableLookupKey(table.getNamespace(), table.getTable()),
108+
tableMetadataService.getTableMetadata(table.getNamespace(), table.getTable()));
109+
}
110+
} else {
111+
tableMetadataMap.put(
112+
TableMetadataUtil.getTableLookupKey(namespace, tableName),
113+
tableMetadataService.getTableMetadata(namespace, tableName));
114+
}
115+
return tableMetadataMap;
116+
} finally {
117+
if (storageAdmin != null) {
118+
storageAdmin.close();
119+
}
120+
}
121+
}
122+
123+
/**
124+
* Create ImportManager object from data
125+
*
126+
* @param importOptions import options
127+
* @param tableMetadataMap table metadata map
128+
* @param reader buffered reader with source data
129+
* @param logWriterFactory log writer factory object
130+
* @param config import logging config
131+
* @return ImportManager object
132+
*/
133+
private ImportManager createImportManager(
134+
ImportOptions importOptions,
135+
Map<String, TableMetadata> tableMetadataMap,
136+
BufferedReader reader,
137+
LogWriterFactory logWriterFactory,
138+
ImportLoggerConfig config)
139+
throws IOException {
140+
File configFile = new File(configFilePath);
141+
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
142+
ImportManager importManager;
143+
if (scalarDbMode == ScalarDbMode.TRANSACTION) {
144+
ScalarDbTransactionManager scalarDbTransactionManager =
145+
new ScalarDbTransactionManager(TransactionFactory.create(configFile));
146+
importManager =
147+
new ImportManager(
148+
tableMetadataMap,
149+
reader,
150+
importOptions,
151+
importProcessorFactory,
152+
ScalarDbMode.TRANSACTION,
153+
null,
154+
scalarDbTransactionManager.getDistributedTransactionManager());
155+
} else {
156+
ScalarDbStorageManager scalarDbStorageManager =
157+
new ScalarDbStorageManager(StorageFactory.create(configFile));
158+
importManager =
159+
new ImportManager(
160+
tableMetadataMap,
161+
reader,
162+
importOptions,
163+
importProcessorFactory,
164+
ScalarDbMode.STORAGE,
165+
scalarDbStorageManager.getDistributedStorage(),
166+
null);
167+
}
168+
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
169+
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
170+
} else {
171+
importManager.addListener(new SingleFileImportLogger(config, logWriterFactory));
172+
}
173+
return importManager;
174+
}
175+
176+
/**
177+
* Validate import targets
178+
*
179+
* @param controlFilePath control file path
180+
* @param namespace Namespace
181+
* @param tableName Single table name
182+
* @throws ParameterException if one of the argument values is wrong
183+
*/
184+
private void validateImportTarget(String controlFilePath, String namespace, String tableName) {
185+
// Throw an error if there was no clear imports target specified
186+
if (StringUtils.isBlank(controlFilePath)
187+
&& (StringUtils.isBlank(namespace) || StringUtils.isBlank(tableName))) {
188+
throw new ParameterException(
189+
spec.commandLine(), CoreError.DATA_LOADER_IMPORT_TARGET_MISSING.buildMessage());
190+
}
191+
192+
// Make sure the control file exists when a path is provided
193+
if (!StringUtils.isBlank(controlFilePath)) {
194+
Path path = Paths.get(controlFilePath);
195+
if (!Files.exists(path)) {
196+
throw new ParameterException(
197+
spec.commandLine(),
198+
CoreError.DATA_LOADER_MISSING_IMPORT_FILE.buildMessage(
199+
controlFilePath, FILE_OPTION_NAME_LONG_FORMAT));
200+
}
201+
}
202+
}
203+
204+
/**
205+
* Validate log directory path
206+
*
207+
* @param logDirectory log directory path
208+
* @throws ParameterException if the path is invalid
209+
*/
210+
private void validateLogDirectory(String logDirectory) throws ParameterException {
211+
Path logDirectoryPath;
212+
if (!StringUtils.isBlank(logDirectory)) {
213+
// User-provided log directory via CLI argument
214+
logDirectoryPath = Paths.get(logDirectory);
215+
216+
if (Files.exists(logDirectoryPath)) {
217+
// Check if the provided directory is writable
218+
if (!Files.isWritable(logDirectoryPath)) {
219+
throw new ParameterException(
220+
spec.commandLine(),
221+
CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage(
222+
logDirectoryPath.toAbsolutePath()));
223+
}
224+
} else {
225+
// Create the log directory if it doesn't exist
226+
try {
227+
Files.createDirectories(logDirectoryPath);
228+
} catch (IOException e) {
229+
throw new ParameterException(
230+
spec.commandLine(),
231+
CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage(
232+
logDirectoryPath.toAbsolutePath()));
233+
}
234+
}
235+
return;
236+
}
237+
238+
// Use the current working directory as the log directory
239+
logDirectoryPath = Paths.get(System.getProperty("user.dir"));
240+
241+
// Check if the current working directory is writable
242+
if (!Files.isWritable(logDirectoryPath)) {
243+
throw new ParameterException(
244+
spec.commandLine(),
245+
CoreError.DATA_LOADER_LOG_DIRECTORY_WRITE_ACCESS_DENIED.buildMessage(
246+
logDirectoryPath.toAbsolutePath()));
247+
}
248+
}
249+
250+
/**
251+
* Generate control file from a valid control file path
252+
*
253+
* @param controlFilePath control directory path
254+
* @return {@code Optional<ControlFile>} generated control file object
255+
* @throws ParameterException if the path is invalid
256+
*/
257+
private Optional<ControlFile> parseControlFileFromPath(String controlFilePath) {
258+
if (StringUtils.isBlank(controlFilePath)) {
259+
return Optional.empty();
260+
}
261+
try {
262+
ObjectMapper objectMapper = new ObjectMapper();
263+
ControlFile controlFile =
264+
objectMapper.readValue(new File(controlFilePath), ControlFile.class);
265+
return Optional.of(controlFile);
266+
} catch (IOException e) {
267+
throw new ParameterException(
268+
spec.commandLine(),
269+
CoreError.DATA_LOADER_INVALID_CONTROL_FILE.buildMessage(controlFilePath));
270+
}
271+
}
272+
273+
/**
274+
* Generate import options object from provided cli parameter data
275+
*
276+
* @param controlFile control file
277+
* @return ImportOptions generated import options object
278+
*/
279+
private ImportOptions createImportOptions(ControlFile controlFile) {
280+
ImportOptions.ImportOptionsBuilder builder =
281+
ImportOptions.builder()
282+
.fileFormat(sourceFileFormat)
283+
.requireAllColumns(requireAllColumns)
284+
.prettyPrint(prettyPrint)
285+
.controlFile(controlFile)
286+
.controlFileValidationLevel(controlFileValidation)
287+
.logRawRecord(logRawRecord)
288+
.logSuccessRecords(logSuccessRecords)
289+
.ignoreNullValues(ignoreNullValues)
290+
.namespace(namespace)
291+
.dataChunkSize(dataChunkSize)
292+
.transactionBatchSize(transactionSize)
293+
.maxThreads(maxThreads)
294+
.dataChunkQueueSize(dataChunkQueueSize)
295+
.tableName(tableName);
296+
297+
// Import mode
298+
if (importMode != null) {
299+
builder.importMode(importMode);
300+
}
301+
if (!splitLogMode) {
302+
builder.logMode(LogMode.SINGLE_FILE);
303+
}
304+
305+
// CSV options
306+
if (sourceFileFormat.equals(FileFormat.CSV)) {
307+
builder.delimiter(delimiter);
308+
if (!StringUtils.isBlank(customHeaderRow)) {
309+
builder.customHeaderRow(customHeaderRow);
310+
}
311+
}
312+
return builder.build();
313+
}
14314
}

0 commit comments

Comments
 (0)