Skip to content

Commit e32fd44

Browse files
committed
Initial core and cli changes
1 parent b3414fb commit e32fd44

File tree

13 files changed

+569
-28
lines changed

13 files changed

+569
-28
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static java.nio.file.StandardOpenOption.CREATE;
55

66
import com.scalar.db.api.DistributedStorage;
7+
import com.scalar.db.api.DistributedTransactionManager;
78
import com.scalar.db.api.TableMetadata;
89
import com.scalar.db.common.error.CoreError;
910
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
@@ -12,6 +13,7 @@
1213
import com.scalar.db.dataloader.cli.util.InvalidFilePathException;
1314
import com.scalar.db.dataloader.core.ColumnKeyValue;
1415
import com.scalar.db.dataloader.core.FileFormat;
16+
import com.scalar.db.dataloader.core.ScalarDbMode;
1517
import com.scalar.db.dataloader.core.ScanRange;
1618
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
1719
import com.scalar.db.dataloader.core.dataexport.ExportManager;
@@ -27,7 +29,9 @@
2729
import com.scalar.db.dataloader.core.util.KeyUtils;
2830
import com.scalar.db.io.Key;
2931
import com.scalar.db.service.StorageFactory;
32+
import com.scalar.db.service.TransactionFactory;
3033
import java.io.BufferedWriter;
34+
import java.io.IOException;
3135
import java.nio.charset.Charset;
3236
import java.nio.file.Files;
3337
import java.nio.file.Paths;
@@ -56,15 +60,14 @@ public Integer call() throws Exception {
5660
try {
5761
validateOutputDirectory();
5862
FileUtils.validateFilePath(scalarDbPropertiesFilePath);
59-
60-
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
61-
TableMetadataService metaDataService =
62-
new TableMetadataService(storageFactory.getStorageAdmin());
63+
TableMetadataService tableMetadataService =
64+
createTableMetadataService(scalarDbMode, scalarDbPropertiesFilePath);
6365
ScalarDbDao scalarDbDao = new ScalarDbDao();
6466

65-
ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
67+
ExportManager exportManager =
68+
createExportManager(scalarDbMode, scalarDbDao, outputFormat, scalarDbPropertiesFilePath);
6669

67-
TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
70+
TableMetadata tableMetadata = tableMetadataService.getTableMetadata(namespace, table);
6871

6972
Key partitionKey =
7073
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
@@ -122,11 +125,57 @@ private void validateOutputDirectory() throws DirectoryValidationException {
122125
}
123126
}
124127

128+
private TableMetadataService createTableMetadataService(
129+
ScalarDbMode scalarDbMode, String scalarDbPropertiesFilePath) throws IOException {
130+
if (scalarDbMode.equals(ScalarDbMode.TRANSACTION)) {
131+
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
132+
return new TableMetadataService(transactionFactory.getTransactionAdmin());
133+
}
134+
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
135+
return new TableMetadataService(storageFactory.getStorageAdmin());
136+
}
137+
125138
private ExportManager createExportManager(
126-
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
139+
ScalarDbMode scalarDbMode,
140+
ScalarDbDao scalarDbDao,
141+
FileFormat fileFormat,
142+
String scalarDbPropertiesFilePath)
143+
throws IOException {
127144
ProducerTaskFactory taskFactory =
128145
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
129-
DistributedStorage storage = storageFactory.getStorage();
146+
if (scalarDbMode.equals(ScalarDbMode.TRANSACTION)) {
147+
DistributedStorage storage = StorageFactory.create(scalarDbPropertiesFilePath).getStorage();
148+
return createExportManagerWithStorage(storage, scalarDbDao, fileFormat, taskFactory);
149+
} else {
150+
DistributedTransactionManager distributedTransactionManager =
151+
TransactionFactory.create(scalarDbPropertiesFilePath).getTransactionManager();
152+
return createExportManagerWithTransaction(
153+
distributedTransactionManager, scalarDbDao, fileFormat, taskFactory);
154+
}
155+
}
156+
157+
private ExportManager createExportManagerWithTransaction(
158+
DistributedTransactionManager distributedTransactionManager,
159+
ScalarDbDao scalarDbDao,
160+
FileFormat fileFormat,
161+
ProducerTaskFactory taskFactory) {
162+
switch (fileFormat) {
163+
case JSON:
164+
return new JsonExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
165+
case JSONL:
166+
return new JsonLineExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
167+
case CSV:
168+
return new CsvExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
169+
default:
170+
throw new AssertionError("Invalid file format" + fileFormat);
171+
}
172+
}
173+
174+
private ExportManager createExportManagerWithStorage(
175+
DistributedStorage storage,
176+
ScalarDbDao scalarDbDao,
177+
FileFormat fileFormat,
178+
ProducerTaskFactory taskFactory) {
130179
switch (fileFormat) {
131180
case JSON:
132181
return new JsonExportManager(storage, scalarDbDao, taskFactory);

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.scalar.db.api.Scan;
44
import com.scalar.db.dataloader.core.ColumnKeyValue;
55
import com.scalar.db.dataloader.core.FileFormat;
6+
import com.scalar.db.dataloader.core.ScalarDbMode;
67
import java.util.ArrayList;
78
import java.util.List;
89
import picocli.CommandLine;
@@ -144,4 +145,11 @@ public class ExportCommandOptions {
144145
description = "Size of the data chunk to process in a single task (default: 200)",
145146
defaultValue = "200")
146147
protected int dataChunkSize;
148+
149+
@CommandLine.Option(
150+
names = {"--mode", "-sm"},
151+
description = "ScalarDB mode (STORAGE, TRANSACTION) (default: STORAGE)",
152+
paramLabel = "<MODE>",
153+
defaultValue = "STORAGE")
154+
protected ScalarDbMode scalarDbMode;
147155
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

33
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransactionManager;
45
import com.scalar.db.api.TableMetadata;
56
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
67
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -13,8 +14,17 @@
1314

1415
public class CsvExportManager extends ExportManager {
1516
public CsvExportManager(
16-
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
17-
super(storage, dao, producerTaskFactory);
17+
DistributedStorage distributedStorage,
18+
ScalarDbDao dao,
19+
ProducerTaskFactory producerTaskFactory) {
20+
super(distributedStorage, dao, producerTaskFactory);
21+
}
22+
23+
public CsvExportManager(
24+
DistributedTransactionManager distributedTransactionManager,
25+
ScalarDbDao dao,
26+
ProducerTaskFactory producerTaskFactory) {
27+
super(distributedTransactionManager, dao, producerTaskFactory);
1828
}
1929

2030
/**

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

33
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransactionManager;
45
import com.scalar.db.api.Result;
56
import com.scalar.db.api.Scanner;
67
import com.scalar.db.api.TableMetadata;
78
import com.scalar.db.dataloader.core.FileFormat;
9+
import com.scalar.db.dataloader.core.ScalarDbMode;
810
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
911
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
1012
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException;
@@ -32,11 +34,31 @@
3234
public abstract class ExportManager {
3335
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);
3436

35-
private final DistributedStorage storage;
37+
private final DistributedStorage distributedStorage;
38+
private final DistributedTransactionManager distributedTransactionManager;
3639
private final ScalarDbDao dao;
3740
private final ProducerTaskFactory producerTaskFactory;
3841
private final Object lock = new Object();
3942

43+
public ExportManager(
44+
DistributedStorage distributedStorage,
45+
ScalarDbDao dao,
46+
ProducerTaskFactory producerTaskFactory) {
47+
this.distributedStorage = distributedStorage;
48+
this.distributedTransactionManager = null;
49+
this.dao = dao;
50+
this.producerTaskFactory = producerTaskFactory;
51+
}
52+
53+
public ExportManager(
54+
DistributedTransactionManager distributedTransactionManager,
55+
ScalarDbDao dao,
56+
ProducerTaskFactory producerTaskFactory) {
57+
this.distributedStorage = null;
58+
this.distributedTransactionManager = distributedTransactionManager;
59+
this.dao = dao;
60+
this.producerTaskFactory = producerTaskFactory;
61+
}
4062
/**
4163
* Create and add header part for the export file
4264
*
@@ -83,8 +105,8 @@ public ExportReport startExport(
83105
BufferedWriter bufferedWriter = new BufferedWriter(writer);
84106
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
85107

86-
try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
87-
108+
try (Scanner scanner =
109+
createScanner(exportOptions, dao, distributedStorage, distributedTransactionManager)) {
88110
Iterator<Result> iterator = scanner.iterator();
89111
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
90112

@@ -118,6 +140,7 @@ public ExportReport startExport(
118140
} catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) {
119141
logger.error("Error during export: {}", e.getMessage());
120142
}
143+
closeResources();
121144
return exportReport;
122145
}
123146

@@ -208,6 +231,18 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
208231
}
209232
}
210233

234+
private Scanner createScanner(
235+
ExportOptions exportOptions,
236+
ScalarDbDao dao,
237+
DistributedStorage storage,
238+
DistributedTransactionManager transactionManager)
239+
throws ScalarDbDaoException {
240+
if (exportOptions.getScalarDbMode().equals(ScalarDbMode.TRANSACTION)) {
241+
return createScannerWithTransaction(exportOptions, dao, transactionManager);
242+
}
243+
return createScannerWithStorage(exportOptions, dao, storage);
244+
}
245+
211246
/**
212247
* To create a scanner object
213248
*
@@ -217,7 +252,7 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
217252
* @return created scanner
218253
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
219254
*/
220-
private Scanner createScanner(
255+
private Scanner createScannerWithStorage(
221256
ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
222257
throws ScalarDbDaoException {
223258
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
@@ -240,4 +275,43 @@ private Scanner createScanner(
240275
storage);
241276
}
242277
}
278+
279+
private Scanner createScannerWithTransaction(
280+
ExportOptions exportOptions,
281+
ScalarDbDao dao,
282+
DistributedTransactionManager distributedTransactionManager)
283+
throws ScalarDbDaoException {
284+
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
285+
if (isScanAll) {
286+
return dao.createScanner(
287+
exportOptions.getNamespace(),
288+
exportOptions.getTableName(),
289+
exportOptions.getProjectionColumns(),
290+
exportOptions.getLimit(),
291+
distributedTransactionManager);
292+
} else {
293+
return dao.createScanner(
294+
exportOptions.getNamespace(),
295+
exportOptions.getTableName(),
296+
exportOptions.getScanPartitionKey(),
297+
exportOptions.getScanRange(),
298+
exportOptions.getSortOrders(),
299+
exportOptions.getProjectionColumns(),
300+
exportOptions.getLimit(),
301+
distributedTransactionManager);
302+
}
303+
}
304+
305+
/** Close resources properly once the process is completed */
306+
public void closeResources() {
307+
try {
308+
if (distributedStorage != null) {
309+
distributedStorage.close();
310+
} else if (distributedTransactionManager != null) {
311+
distributedTransactionManager.close();
312+
}
313+
} catch (Throwable e) {
314+
throw new RuntimeException("Failed to close the resource", e);
315+
}
316+
}
243317
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.scalar.db.api.Scan;
44
import com.scalar.db.dataloader.core.FileFormat;
5+
import com.scalar.db.dataloader.core.ScalarDbMode;
56
import com.scalar.db.dataloader.core.ScanRange;
67
import com.scalar.db.io.Key;
78
import java.util.Collections;
@@ -30,6 +31,7 @@ public class ExportOptions {
3031
@Builder.Default private final boolean includeTransactionMetadata = false;
3132
@Builder.Default private List<String> projectionColumns = Collections.emptyList();
3233
private List<Scan.Ordering> sortOrders;
34+
@Builder.Default private final ScalarDbMode scalarDbMode = ScalarDbMode.STORAGE;
3335

3436
public static ExportOptionsBuilder builder(
3537
String namespace, String tableName, Key scanPartitionKey, FileFormat outputFileFormat) {

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

33
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransactionManager;
45
import com.scalar.db.api.TableMetadata;
56
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
67
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -9,8 +10,17 @@
910

1011
public class JsonExportManager extends ExportManager {
1112
public JsonExportManager(
12-
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
13-
super(storage, dao, producerTaskFactory);
13+
DistributedStorage distributedStorage,
14+
ScalarDbDao dao,
15+
ProducerTaskFactory producerTaskFactory) {
16+
super(distributedStorage, dao, producerTaskFactory);
17+
}
18+
19+
public JsonExportManager(
20+
DistributedTransactionManager distributedTransactionManager,
21+
ScalarDbDao dao,
22+
ProducerTaskFactory producerTaskFactory) {
23+
super(distributedTransactionManager, dao, producerTaskFactory);
1424
}
1525

1626
/**

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

33
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransactionManager;
45
import com.scalar.db.api.TableMetadata;
56
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
67
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
@@ -9,8 +10,17 @@
910

1011
public class JsonLineExportManager extends ExportManager {
1112
public JsonLineExportManager(
12-
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
13-
super(storage, dao, producerTaskFactory);
13+
DistributedStorage distributedStorage,
14+
ScalarDbDao dao,
15+
ProducerTaskFactory producerTaskFactory) {
16+
super(distributedStorage, dao, producerTaskFactory);
17+
}
18+
19+
public JsonLineExportManager(
20+
DistributedTransactionManager distributedTransactionManager,
21+
ScalarDbDao dao,
22+
ProducerTaskFactory producerTaskFactory) {
23+
super(distributedTransactionManager, dao, producerTaskFactory);
1424
}
1525

1626
/**

0 commit comments

Comments
 (0)