Skip to content

Commit f4b0571

Browse files
committed
Changes
1 parent eb62e60 commit f4b0571

File tree

7 files changed

+162
-87
lines changed

7 files changed

+162
-87
lines changed

data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,16 @@ private TableMetadataService createTableMetadataService(
135135
return new TableMetadataService(storageFactory.getStorageAdmin());
136136
}
137137

138+
/**
139+
* Creates an {@link ExportManager} instance based on ScalarDB mode and file format.
140+
*
141+
* @param scalarDbMode The ScalarDB mode (TRANSACTION or STORAGE).
142+
* @param scalarDbDao The DAO for accessing ScalarDB.
143+
* @param fileFormat The output file format (CSV, JSON, JSONL).
144+
* @param scalarDbPropertiesFilePath Path to the ScalarDB properties file.
145+
* @return A configured {@link ExportManager}.
146+
* @throws IOException If there is an error reading the properties file.
147+
*/
138148
private ExportManager createExportManager(
139149
ScalarDbMode scalarDbMode,
140150
ScalarDbDao scalarDbDao,
@@ -154,6 +164,15 @@ private ExportManager createExportManager(
154164
}
155165
}
156166

167+
/**
168+
* Returns an {@link ExportManager} that uses {@link DistributedTransactionManager}.
169+
*
170+
* @param distributedTransactionManager distributed transaction manager object
171+
* @param scalarDbDao The DAO for accessing ScalarDB.
172+
* @param fileFormat The output file format (CSV, JSON, JSONL).
173+
* @param taskFactory Producer task factory object
174+
* @return A configured {@link ExportManager}.
175+
*/
157176
private ExportManager createExportManagerWithTransaction(
158177
DistributedTransactionManager distributedTransactionManager,
159178
ScalarDbDao scalarDbDao,
@@ -171,6 +190,15 @@ private ExportManager createExportManagerWithTransaction(
171190
}
172191
}
173192

193+
/**
194+
* Returns an {@link ExportManager} that uses {@link DistributedStorage}.
195+
*
196+
* @param storage distributed storage object
197+
* @param scalarDbDao The DAO for accessing ScalarDB.
198+
* @param fileFormat The output file format (CSV, JSON, JSONL).
199+
* @param taskFactory Producer task factory object
200+
* @return A configured {@link ExportManager}.
201+
*/
174202
private ExportManager createExportManagerWithStorage(
175203
DistributedStorage storage,
176204
ScalarDbDao scalarDbDao,

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

33
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransaction;
45
import com.scalar.db.api.DistributedTransactionManager;
56
import com.scalar.db.api.Result;
67
import com.scalar.db.api.Scanner;
@@ -14,6 +15,7 @@
1415
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
1516
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
1617
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
18+
import com.scalar.db.exception.transaction.TransactionException;
1719
import com.scalar.db.io.DataType;
1820
import java.io.BufferedWriter;
1921
import java.io.IOException;
@@ -132,7 +134,7 @@ public ExportReport startExport(
132134
// TODO: handle this
133135
}
134136
processFooter(exportOptions, tableMetadata, bufferedWriter);
135-
} catch (InterruptedException | IOException e) {
137+
} catch (InterruptedException | IOException | TransactionException e) {
136138
logger.error("Error during export: {}", e.getMessage());
137139
} finally {
138140
bufferedWriter.flush();
@@ -236,7 +238,7 @@ private Scanner createScanner(
236238
ScalarDbDao dao,
237239
DistributedStorage storage,
238240
DistributedTransactionManager transactionManager)
239-
throws ScalarDbDaoException {
241+
throws ScalarDbDaoException, TransactionException {
240242
if (exportOptions.getScalarDbMode().equals(ScalarDbMode.TRANSACTION)) {
241243
return createScannerWithTransaction(exportOptions, dao, transactionManager);
242244
}
@@ -276,29 +278,63 @@ private Scanner createScannerWithStorage(
276278
}
277279
}
278280

281+
/**
282+
* Creates a {@link Scanner} using a {@link DistributedTransaction} based on the provided export
283+
* options. This method initiates a read-only transaction to ensure a consistent snapshot of the
284+
* data during scan.
285+
*
286+
* @param exportOptions Options specifying how to scan the table (e.g., partition key, range,
287+
* projection).
288+
* @param dao The ScalarDb data access object to create the scanner.
289+
* @param distributedTransactionManager The transaction manager used to start a new transaction.
290+
* @return A {@link Scanner} for reading data from the specified table.
291+
* @throws ScalarDbDaoException If an error occurs while creating the scanner.
292+
* @throws TransactionException If an error occurs during transaction management (start or
293+
* commit).
294+
*/
279295
private Scanner createScannerWithTransaction(
280296
ExportOptions exportOptions,
281297
ScalarDbDao dao,
282298
DistributedTransactionManager distributedTransactionManager)
283-
throws ScalarDbDaoException {
299+
throws ScalarDbDaoException, TransactionException {
300+
284301
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
285-
if (isScanAll) {
286-
return dao.createScanner(
287-
exportOptions.getNamespace(),
288-
exportOptions.getTableName(),
289-
exportOptions.getProjectionColumns(),
290-
exportOptions.getLimit(),
291-
distributedTransactionManager);
292-
} else {
293-
return dao.createScanner(
294-
exportOptions.getNamespace(),
295-
exportOptions.getTableName(),
296-
exportOptions.getScanPartitionKey(),
297-
exportOptions.getScanRange(),
298-
exportOptions.getSortOrders(),
299-
exportOptions.getProjectionColumns(),
300-
exportOptions.getLimit(),
301-
distributedTransactionManager);
302+
DistributedTransaction transaction = distributedTransactionManager.start();
303+
304+
try {
305+
Scanner scanner;
306+
if (isScanAll) {
307+
scanner =
308+
dao.createScanner(
309+
exportOptions.getNamespace(),
310+
exportOptions.getTableName(),
311+
exportOptions.getProjectionColumns(),
312+
exportOptions.getLimit(),
313+
transaction);
314+
} else {
315+
scanner =
316+
dao.createScanner(
317+
exportOptions.getNamespace(),
318+
exportOptions.getTableName(),
319+
exportOptions.getScanPartitionKey(),
320+
exportOptions.getScanRange(),
321+
exportOptions.getSortOrders(),
322+
exportOptions.getProjectionColumns(),
323+
exportOptions.getLimit(),
324+
transaction);
325+
}
326+
327+
transaction.commit();
328+
return scanner;
329+
330+
} catch (Exception e) {
331+
try {
332+
transaction.abort();
333+
} catch (TransactionException abortException) {
334+
logger.error(
335+
"Failed to abort transaction: {}", abortException.getMessage(), abortException);
336+
}
337+
throw e;
302338
}
303339
}
304340

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.scalar.db.api.DistributedStorage;
44
import com.scalar.db.api.DistributedTransaction;
5-
import com.scalar.db.api.DistributedTransactionManager;
65
import com.scalar.db.api.Get;
76
import com.scalar.db.api.GetBuilder;
87
import com.scalar.db.api.Put;
@@ -256,7 +255,7 @@ public Scanner createScanner(
256255
* @param table ScalarDB table name
257256
* @param projectionColumns List of column projection to use during scan
258257
* @param limit Scan limit value
259-
* @param distributedTransactionManager Distributed transaction manager object
258+
* @param transaction Distributed transaction object
260259
* @return ScalarDB Scanner object
261260
* @throws ScalarDbDaoException if scan fails
262261
*/
@@ -265,12 +264,12 @@ public Scanner createScanner(
265264
String table,
266265
List<String> projectionColumns,
267266
int limit,
268-
DistributedTransactionManager distributedTransactionManager)
267+
DistributedTransaction transaction)
269268
throws ScalarDbDaoException {
270269
Scan scan =
271270
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
272271
try {
273-
return (Scanner) distributedTransactionManager.getScanner(scan);
272+
return (Scanner) transaction.getScanner(scan);
274273
} catch (CrudException e) {
275274
throw new ScalarDbDaoException(
276275
CoreError.DATA_LOADER_ERROR_SCAN.buildMessage(e.getMessage()), e);
@@ -321,7 +320,7 @@ public Scanner createScanner(
321320
* @param sortOrders Optional scan clustering key sorting values
322321
* @param projectionColumns List of column projection to use during scan
323322
* @param limit Scan limit value
324-
* @param distributedTransactionManager Distributed transaction manager object
323+
* @param transaction Distributed transaction object
325324
* @return ScalarDB Scanner object
326325
*/
327326
public Scanner createScanner(
@@ -332,11 +331,11 @@ public Scanner createScanner(
332331
@Nullable List<Scan.Ordering> sortOrders,
333332
@Nullable List<String> projectionColumns,
334333
int limit,
335-
DistributedTransactionManager distributedTransactionManager) {
334+
DistributedTransaction transaction) {
336335
Scan scan =
337336
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
338337
try {
339-
return (Scanner) distributedTransactionManager.getScanner(scan);
338+
return (Scanner) transaction.getScanner(scan);
340339
} catch (CrudException e) {
341340
throw new RuntimeException(e);
342341
}

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3+
import static org.mockito.Mockito.when;
4+
35
import com.scalar.db.api.DistributedStorage;
6+
import com.scalar.db.api.DistributedTransaction;
47
import com.scalar.db.api.DistributedTransactionManager;
58
import com.scalar.db.api.Result;
69
import com.scalar.db.api.Scanner;
@@ -13,6 +16,7 @@
1316
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
1417
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
1518
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
19+
import com.scalar.db.exception.transaction.TransactionException;
1620
import com.scalar.db.io.Column;
1721
import com.scalar.db.io.IntColumn;
1822
import com.scalar.db.io.Key;
@@ -36,16 +40,19 @@ public class CsvExportManagerTest {
3640
TableMetadata mockData;
3741
DistributedStorage storage;
3842
DistributedTransactionManager manager;
43+
DistributedTransaction transaction;
3944
@Spy ScalarDbDao dao;
4045
ProducerTaskFactory producerTaskFactory;
4146
ExportManager exportManager;
4247

4348
@BeforeEach
44-
void setup() {
49+
void setup() throws TransactionException {
4550
storage = Mockito.mock(DistributedStorage.class);
4651
manager = Mockito.mock(DistributedTransactionManager.class);
52+
transaction = Mockito.mock(DistributedTransaction.class);
4753
mockData = UnitTestUtils.createTestTableMetadata();
4854
dao = Mockito.mock(ScalarDbDao.class);
55+
when(manager.start()).thenReturn(transaction);
4956
producerTaskFactory = new ProducerTaskFactory(null, false, true);
5057
}
5158

@@ -64,15 +71,14 @@ void startExport_givenValidDataWithoutPartitionKey_withStorage_shouldGenerateOut
6471
.scanRange(new ScanRange(null, null, false, false))
6572
.build();
6673

67-
Mockito.when(
68-
dao.createScanner(
69-
exportOptions.getNamespace(),
70-
exportOptions.getTableName(),
71-
exportOptions.getProjectionColumns(),
72-
exportOptions.getLimit(),
73-
storage))
74+
when(dao.createScanner(
75+
exportOptions.getNamespace(),
76+
exportOptions.getTableName(),
77+
exportOptions.getProjectionColumns(),
78+
exportOptions.getLimit(),
79+
storage))
7480
.thenReturn(scanner);
75-
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
81+
when(scanner.iterator()).thenReturn(results.iterator());
7682
try (BufferedWriter writer =
7783
new BufferedWriter(
7884
Files.newBufferedWriter(
@@ -108,18 +114,17 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
108114
.scanRange(new ScanRange(null, null, false, false))
109115
.build();
110116

111-
Mockito.when(
112-
dao.createScanner(
113-
exportOptions.getNamespace(),
114-
exportOptions.getTableName(),
115-
exportOptions.getScanPartitionKey(),
116-
exportOptions.getScanRange(),
117-
exportOptions.getSortOrders(),
118-
exportOptions.getProjectionColumns(),
119-
exportOptions.getLimit(),
120-
storage))
117+
when(dao.createScanner(
118+
exportOptions.getNamespace(),
119+
exportOptions.getTableName(),
120+
exportOptions.getScanPartitionKey(),
121+
exportOptions.getScanRange(),
122+
exportOptions.getSortOrders(),
123+
exportOptions.getProjectionColumns(),
124+
exportOptions.getLimit(),
125+
storage))
121126
.thenReturn(scanner);
122-
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
127+
when(scanner.iterator()).thenReturn(results.iterator());
123128
try (BufferedWriter writer =
124129
new BufferedWriter(
125130
Files.newBufferedWriter(
@@ -150,15 +155,14 @@ void startExport_givenValidDataWithoutPartitionKey_withTransaction_shouldGenerat
150155
.scalarDbMode(ScalarDbMode.TRANSACTION)
151156
.build();
152157

153-
Mockito.when(
154-
dao.createScanner(
155-
exportOptions.getNamespace(),
156-
exportOptions.getTableName(),
157-
exportOptions.getProjectionColumns(),
158-
exportOptions.getLimit(),
159-
manager))
158+
when(dao.createScanner(
159+
exportOptions.getNamespace(),
160+
exportOptions.getTableName(),
161+
exportOptions.getProjectionColumns(),
162+
exportOptions.getLimit(),
163+
transaction))
160164
.thenReturn(scanner);
161-
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
165+
when(scanner.iterator()).thenReturn(results.iterator());
162166
try (BufferedWriter writer =
163167
new BufferedWriter(
164168
Files.newBufferedWriter(
@@ -174,8 +178,7 @@ void startExport_givenValidDataWithoutPartitionKey_withTransaction_shouldGenerat
174178
}
175179

176180
@Test
177-
void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile()
178-
throws IOException, ScalarDbDaoException {
181+
void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException {
179182
producerTaskFactory = new ProducerTaskFactory(",", false, false);
180183
exportManager = new CsvExportManager(manager, dao, producerTaskFactory);
181184
Scanner scanner = Mockito.mock(Scanner.class);
@@ -195,18 +198,17 @@ void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile()
195198
.scalarDbMode(ScalarDbMode.TRANSACTION)
196199
.build();
197200

198-
Mockito.when(
199-
dao.createScanner(
200-
exportOptions.getNamespace(),
201-
exportOptions.getTableName(),
202-
exportOptions.getScanPartitionKey(),
203-
exportOptions.getScanRange(),
204-
exportOptions.getSortOrders(),
205-
exportOptions.getProjectionColumns(),
206-
exportOptions.getLimit(),
207-
manager))
201+
when(dao.createScanner(
202+
exportOptions.getNamespace(),
203+
exportOptions.getTableName(),
204+
exportOptions.getScanPartitionKey(),
205+
exportOptions.getScanRange(),
206+
exportOptions.getSortOrders(),
207+
exportOptions.getProjectionColumns(),
208+
exportOptions.getLimit(),
209+
transaction))
208210
.thenReturn(scanner);
209-
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
211+
when(scanner.iterator()).thenReturn(results.iterator());
210212
try (BufferedWriter writer =
211213
new BufferedWriter(
212214
Files.newBufferedWriter(

0 commit comments

Comments
 (0)