Skip to content

Commit 32b4b70

Browse files
committed
Initial changes
1 parent 627fe69 commit 32b4b70

File tree

9 files changed

+139
-97
lines changed

9 files changed

+139
-97
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static java.nio.file.StandardOpenOption.APPEND;
66
import static java.nio.file.StandardOpenOption.CREATE;
77

8-
import com.scalar.db.api.DistributedStorage;
98
import com.scalar.db.api.DistributedTransactionManager;
109
import com.scalar.db.api.TableMetadata;
1110
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
@@ -30,15 +29,14 @@
3029
import com.scalar.db.dataloader.core.util.KeyUtils;
3130
import com.scalar.db.io.Key;
3231
import com.scalar.db.service.StorageFactory;
32+
import com.scalar.db.service.TransactionFactory;
3333
import java.io.BufferedWriter;
3434
import java.nio.charset.Charset;
3535
import java.nio.file.Files;
3636
import java.nio.file.Paths;
3737
import java.util.List;
3838
import java.util.Objects;
3939
import java.util.concurrent.Callable;
40-
41-
import com.scalar.db.service.TransactionFactory;
4240
import org.apache.commons.lang3.StringUtils;
4341
import org.slf4j.Logger;
4442
import org.slf4j.LoggerFactory;
@@ -67,12 +65,13 @@ public Integer call() throws Exception {
6765
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);
6866

6967
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
70-
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath)
68+
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
7169
TableMetadataService metaDataService =
7270
new TableMetadataService(storageFactory.getStorageAdmin());
7371
ScalarDbDao scalarDbDao = new ScalarDbDao();
7472

75-
ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
73+
ExportManager exportManager =
74+
createExportManager(transactionFactory, scalarDbDao, outputFormat);
7675

7776
TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
7877

@@ -153,14 +152,14 @@ private ExportManager createExportManager(
153152
TransactionFactory transactionFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
154153
ProducerTaskFactory taskFactory =
155154
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
156-
DistributedTransactionManager manager = transactionFactory.getTransactionManager();
155+
DistributedTransactionManager manager = transactionFactory.getTransactionManager();
157156
switch (fileFormat) {
158157
case JSON:
159-
return new JsonExportManager(storage, scalarDbDao, taskFactory);
158+
return new JsonExportManager(manager, scalarDbDao, taskFactory);
160159
case JSONL:
161-
return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
160+
return new JsonLineExportManager(manager, scalarDbDao, taskFactory);
162161
case CSV:
163-
return new CsvExportManager(storage, scalarDbDao, taskFactory);
162+
return new CsvExportManager(manager, scalarDbDao, taskFactory);
164163
default:
165164
throw new AssertionError("Invalid file format" + fileFormat);
166165
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
43
import com.scalar.db.api.DistributedTransactionManager;
54
import com.scalar.db.api.TableMetadata;
65
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
@@ -16,15 +15,22 @@
1615
public class CsvExportManager extends ExportManager {
1716

1817
/**
19-
* Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link
20-
* ScalarDbDao}, and {@link ProducerTaskFactory}.
18+
* Constructs a {@code CsvExportManager} for exporting data using a {@link
19+
* DistributedTransactionManager}.
2120
*
22-
* @param manager the {@code DistributedTransactionManager} instance used to read data from the database
23-
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
24-
* @param producerTaskFactory the factory used to create producer tasks for exporting data
21+
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
22+
* from ScalarDB within a distributed transaction context.
23+
*
24+
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
25+
* mode
26+
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
27+
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
28+
* exporting data
2529
*/
2630
public CsvExportManager(
27-
DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
31+
DistributedTransactionManager manager,
32+
ScalarDbDao dao,
33+
ProducerTaskFactory producerTaskFactory) {
2834
super(manager, dao, producerTaskFactory);
2935
}
3036

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
43
import com.scalar.db.api.DistributedTransactionManager;
54
import com.scalar.db.api.Result;
65
import com.scalar.db.api.Scanner;
76
import com.scalar.db.api.TableMetadata;
7+
import com.scalar.db.api.TransactionManagerCrudOperable;
88
import com.scalar.db.dataloader.core.FileFormat;
99
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
1010
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
@@ -87,7 +87,7 @@ public ExportReport startExport(
8787
BufferedWriter bufferedWriter = new BufferedWriter(writer);
8888
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
8989

90-
try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
90+
try (Scanner scanner = createScanner(exportOptions, dao, distributedTransactionManager)) {
9191

9292
Iterator<Result> iterator = scanner.iterator();
9393
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
@@ -217,12 +217,12 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
217217
*
218218
* @param exportOptions export options
219219
* @param dao ScalarDB dao object
220-
* @param storage distributed storage object
220+
* @param manager DistributedTransactionManager object
221221
* @return created scanner
222222
* @throws ScalarDbDaoException throws if any issue occurs in creating scanner object
223223
*/
224-
private Scanner createScanner(
225-
ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage)
224+
private TransactionManagerCrudOperable.Scanner createScanner(
225+
ExportOptions exportOptions, ScalarDbDao dao, DistributedTransactionManager manager)
226226
throws ScalarDbDaoException {
227227
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
228228
if (isScanAll) {
@@ -231,7 +231,7 @@ private Scanner createScanner(
231231
exportOptions.getTableName(),
232232
exportOptions.getProjectionColumns(),
233233
exportOptions.getLimit(),
234-
storage);
234+
manager);
235235
} else {
236236
return dao.createScanner(
237237
exportOptions.getNamespace(),
@@ -241,7 +241,7 @@ private Scanner createScanner(
241241
exportOptions.getSortOrders(),
242242
exportOptions.getProjectionColumns(),
243243
exportOptions.getLimit(),
244-
storage);
244+
manager);
245245
}
246246
}
247247
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
43
import com.scalar.db.api.DistributedTransactionManager;
54
import com.scalar.db.api.TableMetadata;
65
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
@@ -12,15 +11,22 @@
1211
public class JsonExportManager extends ExportManager {
1312

1413
/**
15-
* Constructs a {@code JsonExportManager} with the specified {@link DistributedTransactionManager}, {@link
16-
* ScalarDbDao}, and {@link ProducerTaskFactory}.
14+
* Constructs a {@code JsonExportManager} for exporting data using a {@link
15+
* DistributedTransactionManager}.
1716
*
18-
* @param manager the {@code DistributedTransactionManager} instance used to read data from the database
19-
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
20-
* @param producerTaskFactory the factory used to create producer tasks for exporting data
17+
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
18+
* from ScalarDB within a distributed transaction context and exported in JSON format.
19+
*
20+
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
21+
* mode
22+
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
23+
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
24+
* exporting data
2125
*/
2226
public JsonExportManager(
23-
DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
27+
DistributedTransactionManager manager,
28+
ScalarDbDao dao,
29+
ProducerTaskFactory producerTaskFactory) {
2430
super(manager, dao, producerTaskFactory);
2531
}
2632

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import com.scalar.db.api.DistributedStorage;
43
import com.scalar.db.api.DistributedTransactionManager;
54
import com.scalar.db.api.TableMetadata;
65
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
@@ -14,15 +13,22 @@
1413
public class JsonLineExportManager extends ExportManager {
1514

1615
/**
17-
* Constructs a {@code JsonLineExportManager} with the specified {@link DistributedTransactionManager},
18-
* {@link ScalarDbDao}, and {@link ProducerTaskFactory}.
16+
* Constructs a {@code JsonLineExportManager} for exporting data using a {@link
17+
* DistributedTransactionManager}.
1918
*
20-
* @param manager the {@code DistributedTransactionManager} instance used to read data from the database
21-
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
22-
* @param producerTaskFactory the factory used to create producer tasks for exporting data
19+
* <p>This constructor is used when exporting data in transactional mode, allowing data to be read
20+
* from ScalarDB within a distributed transaction context and exported in JSON Lines format.
21+
*
22+
* @param manager the {@link DistributedTransactionManager} used to read data in transactional
23+
* mode
24+
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for export operations
25+
* @param producerTaskFactory the {@link ProducerTaskFactory} used to create producer tasks for
26+
* exporting data
2327
*/
2428
public JsonLineExportManager(
25-
DistributedTransactionManager manager, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
29+
DistributedTransactionManager manager,
30+
ScalarDbDao dao,
31+
ProducerTaskFactory producerTaskFactory) {
2632
super(manager, dao, producerTaskFactory);
2733
}
2834

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.scalar.db.api.DistributedStorage;
44
import com.scalar.db.api.DistributedTransaction;
5+
import com.scalar.db.api.DistributedTransactionManager;
56
import com.scalar.db.api.Get;
67
import com.scalar.db.api.GetBuilder;
78
import com.scalar.db.api.Put;
@@ -10,6 +11,7 @@
1011
import com.scalar.db.api.Scan;
1112
import com.scalar.db.api.ScanBuilder;
1213
import com.scalar.db.api.Scanner;
14+
import com.scalar.db.api.TransactionManagerCrudOperable;
1315
import com.scalar.db.dataloader.core.DataLoaderError;
1416
import com.scalar.db.dataloader.core.ScanRange;
1517
import com.scalar.db.exception.storage.ExecutionException;
@@ -219,61 +221,77 @@ public List<Result> scan(
219221
}
220222

221223
/**
222-
* Create a ScalarDB scanner instance
224+
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
225+
* ScalarDB.
223226
*
224-
* @param namespace ScalarDB namespace
225-
* @param table ScalarDB table name
226-
* @param projectionColumns List of column projection to use during scan
227-
* @param limit Scan limit value
228-
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
229-
* @return ScalarDB Scanner object
230-
* @throws ScalarDbDaoException if scan fails
227+
* <p>This method builds and executes a {@link Scan} operation for the specified table and returns
228+
* a scanner that iterates over the retrieved records. It operates in storage mode using a {@link
229+
* DistributedTransactionManager}.
230+
*
231+
* @param namespace the ScalarDB namespace to scan
232+
* @param table the ScalarDB table name
233+
* @param projectionColumns the list of column names to include in the scan results
234+
* @param limit the maximum number of records to retrieve
235+
* @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
236+
* storage mode
237+
* @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over the scan
238+
* results
239+
* @throws ScalarDbDaoException if an error occurs while creating or executing the scan
231240
*/
232-
public Scanner createScanner(
241+
public TransactionManagerCrudOperable.Scanner createScanner(
233242
String namespace,
234243
String table,
235244
List<String> projectionColumns,
236245
int limit,
237-
DistributedStorage storage)
246+
DistributedTransactionManager manager)
238247
throws ScalarDbDaoException {
239248
Scan scan =
240249
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
241250
try {
242-
return storage.scan(scan);
243-
} catch (ExecutionException e) {
251+
return manager.getScanner(scan);
252+
} catch (CrudException e) {
244253
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
245254
}
246255
}
247256

248257
/**
249-
* Create a ScalarDB scanner instance
258+
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance for reading data from
259+
* ScalarDB.
250260
*
251-
* @param namespace ScalarDB namespace
252-
* @param table ScalarDB table name
253-
* @param partitionKey Partition key used in ScalarDB scan
254-
* @param scanRange Optional range to set ScalarDB scan start and end values
255-
* @param sortOrders Optional scan clustering key sorting values
256-
* @param projectionColumns List of column projection to use during scan
257-
* @param limit Scan limit value
258-
* @param storage Distributed storage for ScalarDB connection that is running in storage mode
259-
* @return ScalarDB Scanner object
260-
* @throws ScalarDbDaoException if scan fails
261+
* <p>This method builds and executes a {@link Scan} operation using the provided parameters and
262+
* returns a scanner that iterates over the matching records. It is used in storage mode through a
263+
* {@link DistributedTransactionManager}.
264+
*
265+
* @param namespace the ScalarDB namespace to scan
266+
* @param table the ScalarDB table name
267+
* @param partitionKey the optional {@link Key} representing the partition key for the scan
268+
* @param scanRange the optional {@link ScanRange} defining the start and end boundaries for the
269+
* scan
270+
* @param sortOrders the optional list of {@link Scan.Ordering} objects defining the clustering
271+
* key sort order
272+
* @param projectionColumns the optional list of column names to include in the scan results
273+
* @param limit the maximum number of records to retrieve
274+
* @param manager the {@link DistributedTransactionManager} used to obtain the ScalarDB scanner in
275+
* storage mode
276+
* @return a {@link TransactionManagerCrudOperable.Scanner} instance for iterating over scan
277+
* results
278+
* @throws ScalarDbDaoException if an error occurs while creating or executing the scan
261279
*/
262-
public Scanner createScanner(
280+
public TransactionManagerCrudOperable.Scanner createScanner(
263281
String namespace,
264282
String table,
265283
@Nullable Key partitionKey,
266284
@Nullable ScanRange scanRange,
267285
@Nullable List<Scan.Ordering> sortOrders,
268286
@Nullable List<String> projectionColumns,
269287
int limit,
270-
DistributedStorage storage)
288+
DistributedTransactionManager manager)
271289
throws ScalarDbDaoException {
272290
Scan scan =
273291
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
274292
try {
275-
return storage.scan(scan);
276-
} catch (ExecutionException e) {
293+
return manager.getScanner(scan);
294+
} catch (CrudException e) {
277295
throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e);
278296
}
279297
}

0 commit comments

Comments
 (0)