Skip to content

Commit 36f9b43

Browse files
committed
Changed implemenatation to use DistributedTransactionManager
1 parent 8c189e8 commit 36f9b43

File tree

7 files changed

+60
-78
lines changed

7 files changed

+60
-78
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

33
import com.scalar.db.api.DistributedStorage;
4-
import com.scalar.db.api.DistributedTransaction;
54
import com.scalar.db.api.DistributedTransactionManager;
65
import com.scalar.db.api.Result;
76
import com.scalar.db.api.Scanner;
87
import com.scalar.db.api.TableMetadata;
9-
import com.scalar.db.api.TransactionCrudOperable;
8+
import com.scalar.db.api.TransactionManagerCrudOperable;
109
import com.scalar.db.dataloader.core.FileFormat;
1110
import com.scalar.db.dataloader.core.ScalarDbMode;
1211
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
@@ -125,10 +124,9 @@ public ExportReport startExport(
125124
}
126125
} else if (exportOptions.getScalarDbMode() == ScalarDbMode.TRANSACTION
127126
&& distributedTransactionManager != null) {
128-
ScannerWithTransaction scannerWithTx =
129-
createScannerWithTransaction(exportOptions, dao, distributedTransactionManager);
130127

131-
try (TransactionCrudOperable.Scanner scanner = scannerWithTx.getScanner()) {
128+
try (TransactionManagerCrudOperable.Scanner scanner =
129+
createScannerWithTransaction(exportOptions, dao, distributedTransactionManager)) {
132130
submitTasks(
133131
scanner.iterator(),
134132
executorService,
@@ -138,8 +136,6 @@ public ExportReport startExport(
138136
bufferedWriter,
139137
isFirstBatch,
140138
exportReport);
141-
} finally {
142-
scannerWithTx.getTransaction().commit();
143139
}
144140
}
145141

@@ -361,44 +357,39 @@ private Scanner createScannerWithStorage(
361357
}
362358

363359
/**
364-
* Creates a {@link ScannerWithTransaction} object that encapsulates a transactional scanner and
365-
* its associated transaction for reading data from a ScalarDB table.
360+
* Creates a {@link TransactionManagerCrudOperable.Scanner} instance using the given {@link
361+
* ExportOptions}, {@link ScalarDbDao}, and {@link DistributedTransactionManager}.
366362
*
367-
* <p>If no partition key is provided in the {@link ExportOptions}, a full table scan is
368-
* performed. Otherwise, a partition-specific scan is created using the provided partition key,
369-
* optional scan range, and sort orders.
370-
*
371-
* <p>The method starts a new transaction using the given {@link DistributedTransactionManager},
372-
* which will be associated with the returned scanner. This allows data export operations to be
373-
* executed in a consistent transactional context.
363+
* <p>If {@code scanPartitionKey} is not specified in {@code exportOptions}, a full table scan is
364+
* performed using the specified projection columns and limit. Otherwise, the scan is executed
365+
* with the specified partition key, range, sort orders, projection columns, and limit.
374366
*
375-
* @param exportOptions the options specifying how to scan the table, such as namespace, table
376-
* name, projection columns, scan partition key, range, sort orders, and limit.
377-
* @param dao the {@link ScalarDbDao} used to construct the transactional scanner.
378-
* @param distributedTransactionManager the transaction manager used to start a new transaction.
379-
* @return a {@link ScannerWithTransaction} instance that wraps both the transaction and the
380-
* scanner.
381-
* @throws ScalarDbDaoException if an error occurs while creating the scanner with the DAO.
382-
* @throws TransactionException if an error occurs when starting the transaction.
367+
* @param exportOptions the export options containing scan configuration such as namespace, table
368+
* name, partition key, projection columns, limit, range, and sort order
369+
* @param dao the ScalarDB DAO used to create the scanner
370+
* @param distributedTransactionManager the transaction manager to use for the scan operation
371+
* @return a {@link TransactionManagerCrudOperable.Scanner} for retrieving rows in transaction
372+
* mode
373+
* @throws ScalarDbDaoException if an error occurs while creating the scanner
374+
* @throws TransactionException if a transaction-related error occurs during scanner creation
383375
*/
384-
private ScannerWithTransaction createScannerWithTransaction(
376+
private TransactionManagerCrudOperable.Scanner createScannerWithTransaction(
385377
ExportOptions exportOptions,
386378
ScalarDbDao dao,
387379
DistributedTransactionManager distributedTransactionManager)
388380
throws ScalarDbDaoException, TransactionException {
389381

390382
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
391-
DistributedTransaction transaction = distributedTransactionManager.start();
392383

393-
TransactionCrudOperable.Scanner scanner;
384+
TransactionManagerCrudOperable.Scanner scanner;
394385
if (isScanAll) {
395386
scanner =
396387
dao.createScanner(
397388
exportOptions.getNamespace(),
398389
exportOptions.getTableName(),
399390
exportOptions.getProjectionColumns(),
400391
exportOptions.getLimit(),
401-
transaction);
392+
distributedTransactionManager);
402393
} else {
403394
scanner =
404395
dao.createScanner(
@@ -409,10 +400,10 @@ private ScannerWithTransaction createScannerWithTransaction(
409400
exportOptions.getSortOrders(),
410401
exportOptions.getProjectionColumns(),
411402
exportOptions.getLimit(),
412-
transaction);
403+
distributedTransactionManager);
413404
}
414405

415-
return new ScannerWithTransaction(transaction, scanner);
406+
return scanner;
416407
}
417408

418409
/** Close resources properly once the process is completed */

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ScannerWithTransaction.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.scalar.db.api.DistributedStorage;
44
import com.scalar.db.api.DistributedTransaction;
5+
import com.scalar.db.api.DistributedTransactionManager;
56
import com.scalar.db.api.Get;
67
import com.scalar.db.api.GetBuilder;
78
import com.scalar.db.api.Put;
@@ -10,7 +11,7 @@
1011
import com.scalar.db.api.Scan;
1112
import com.scalar.db.api.ScanBuilder;
1213
import com.scalar.db.api.Scanner;
13-
import com.scalar.db.api.TransactionCrudOperable;
14+
import com.scalar.db.api.TransactionManagerCrudOperable;
1415
import com.scalar.db.common.error.CoreError;
1516
import com.scalar.db.dataloader.core.ScanRange;
1617
import com.scalar.db.exception.storage.ExecutionException;
@@ -260,12 +261,12 @@ public Scanner createScanner(
260261
* @return ScalarDB Scanner object
261262
* @throws ScalarDbDaoException if scan fails
262263
*/
263-
public TransactionCrudOperable.Scanner createScanner(
264+
public TransactionManagerCrudOperable.Scanner createScanner(
264265
String namespace,
265266
String table,
266267
List<String> projectionColumns,
267268
int limit,
268-
DistributedTransaction transaction)
269+
DistributedTransactionManager transaction)
269270
throws ScalarDbDaoException {
270271
Scan scan =
271272
createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit);
@@ -324,15 +325,15 @@ public Scanner createScanner(
324325
* @param transaction Distributed transaction object
325326
* @return ScalarDB Scanner object
326327
*/
327-
public TransactionCrudOperable.Scanner createScanner(
328+
public TransactionManagerCrudOperable.Scanner createScanner(
328329
String namespace,
329330
String table,
330331
@Nullable Key partitionKey,
331332
@Nullable ScanRange scanRange,
332333
@Nullable List<Scan.Ordering> sortOrders,
333334
@Nullable List<String> projectionColumns,
334335
int limit,
335-
DistributedTransaction transaction) {
336+
DistributedTransactionManager transaction) {
336337
Scan scan =
337338
createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit);
338339
try {

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.scalar.db.api.Result;
99
import com.scalar.db.api.Scanner;
1010
import com.scalar.db.api.TableMetadata;
11-
import com.scalar.db.api.TransactionCrudOperable;
11+
import com.scalar.db.api.TransactionManagerCrudOperable;
1212
import com.scalar.db.common.ResultImpl;
1313
import com.scalar.db.dataloader.core.FileFormat;
1414
import com.scalar.db.dataloader.core.ScalarDbMode;
@@ -144,7 +144,8 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
144144
void startExport_givenValidDataWithoutPartitionKey_withTransaction_shouldGenerateOutputFile()
145145
throws IOException, ScalarDbDaoException {
146146
exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory);
147-
TransactionCrudOperable.Scanner scanner = Mockito.mock(TransactionCrudOperable.Scanner.class);
147+
TransactionManagerCrudOperable.Scanner scanner =
148+
Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
148149
String filePath = Paths.get("").toAbsolutePath() + "/output.csv";
149150
Map<String, Column<?>> values = UnitTestUtils.createTestValues();
150151
Result result = new ResultImpl(values, mockData);
@@ -161,7 +162,7 @@ void startExport_givenValidDataWithoutPartitionKey_withTransaction_shouldGenerat
161162
exportOptions.getTableName(),
162163
exportOptions.getProjectionColumns(),
163164
exportOptions.getLimit(),
164-
transaction))
165+
manager))
165166
.thenReturn(scanner);
166167
when(scanner.iterator()).thenReturn(results.iterator());
167168
try (BufferedWriter writer =
@@ -182,7 +183,8 @@ void startExport_givenValidDataWithoutPartitionKey_withTransaction_shouldGenerat
182183
void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException {
183184
producerTaskFactory = new ProducerTaskFactory(",", false, false);
184185
exportManager = new CsvExportManager(manager, dao, producerTaskFactory);
185-
TransactionCrudOperable.Scanner scanner = Mockito.mock(TransactionCrudOperable.Scanner.class);
186+
TransactionManagerCrudOperable.Scanner scanner =
187+
Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
186188
String filePath = Paths.get("").toAbsolutePath() + "/output.csv";
187189
Map<String, Column<?>> values = UnitTestUtils.createTestValues();
188190
Result result = new ResultImpl(values, mockData);
@@ -207,7 +209,7 @@ void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() th
207209
exportOptions.getSortOrders(),
208210
exportOptions.getProjectionColumns(),
209211
exportOptions.getLimit(),
210-
transaction))
212+
manager))
211213
.thenReturn(scanner);
212214
when(scanner.iterator()).thenReturn(results.iterator());
213215
try (BufferedWriter writer =

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.scalar.db.api.Result;
99
import com.scalar.db.api.Scanner;
1010
import com.scalar.db.api.TableMetadata;
11-
import com.scalar.db.api.TransactionCrudOperable;
11+
import com.scalar.db.api.TransactionManagerCrudOperable;
1212
import com.scalar.db.common.ResultImpl;
1313
import com.scalar.db.dataloader.core.FileFormat;
1414
import com.scalar.db.dataloader.core.ScalarDbMode;
@@ -148,7 +148,8 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
148148
startExport_givenValidDataWithoutPartitionKey_withTransaction_withStorage_shouldGenerateOutputFile()
149149
throws IOException, ScalarDbDaoException {
150150
exportManager = new JsonExportManager(manager, dao, producerTaskFactory);
151-
TransactionCrudOperable.Scanner scanner = Mockito.mock(TransactionCrudOperable.Scanner.class);
151+
TransactionManagerCrudOperable.Scanner scanner =
152+
Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
152153
String filePath = Paths.get("").toAbsolutePath() + "/output.json";
153154
Map<String, Column<?>> values = UnitTestUtils.createTestValues();
154155
Result result = new ResultImpl(values, mockData);
@@ -167,7 +168,7 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
167168
exportOptions.getTableName(),
168169
exportOptions.getProjectionColumns(),
169170
exportOptions.getLimit(),
170-
transaction))
171+
manager))
171172
.thenReturn(scanner);
172173
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
173174
try (BufferedWriter writer =
@@ -187,7 +188,8 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
187188
@Test
188189
void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException {
189190
exportManager = new JsonExportManager(manager, dao, producerTaskFactory);
190-
TransactionCrudOperable.Scanner scanner = Mockito.mock(TransactionCrudOperable.Scanner.class);
191+
TransactionManagerCrudOperable.Scanner scanner =
192+
Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
191193
String filePath = Paths.get("").toAbsolutePath() + "/output.json";
192194
Map<String, Column<?>> values = UnitTestUtils.createTestValues();
193195
Result result = new ResultImpl(values, mockData);
@@ -213,7 +215,7 @@ void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() th
213215
exportOptions.getSortOrders(),
214216
exportOptions.getProjectionColumns(),
215217
exportOptions.getLimit(),
216-
transaction))
218+
manager))
217219
.thenReturn(scanner);
218220
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
219221
try (BufferedWriter writer =

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package com.scalar.db.dataloader.core.dataexport;
22

3-
import static org.mockito.Mockito.when;
4-
53
import com.scalar.db.api.DistributedStorage;
6-
import com.scalar.db.api.DistributedTransaction;
74
import com.scalar.db.api.DistributedTransactionManager;
85
import com.scalar.db.api.Result;
96
import com.scalar.db.api.Scanner;
107
import com.scalar.db.api.TableMetadata;
11-
import com.scalar.db.api.TransactionCrudOperable;
8+
import com.scalar.db.api.TransactionManagerCrudOperable;
129
import com.scalar.db.common.ResultImpl;
1310
import com.scalar.db.dataloader.core.FileFormat;
1411
import com.scalar.db.dataloader.core.ScalarDbMode;
@@ -17,7 +14,6 @@
1714
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
1815
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
1916
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException;
20-
import com.scalar.db.exception.transaction.TransactionException;
2117
import com.scalar.db.io.Column;
2218
import com.scalar.db.io.IntColumn;
2319
import com.scalar.db.io.Key;
@@ -40,21 +36,18 @@
4036
public class JsonLineExportManagerTest {
4137
TableMetadata mockData;
4238
DistributedStorage storage;
43-
DistributedTransaction transaction;
4439
DistributedTransactionManager manager;
4540
@Spy ScalarDbDao dao;
4641
ProducerTaskFactory producerTaskFactory;
4742
ExportManager exportManager;
4843

4944
@BeforeEach
50-
void setup() throws TransactionException {
45+
void setup() {
5146
storage = Mockito.mock(DistributedStorage.class);
52-
transaction = Mockito.mock(DistributedTransaction.class);
5347
manager = Mockito.mock(DistributedTransactionManager.class);
5448
mockData = UnitTestUtils.createTestTableMetadata();
5549
dao = Mockito.mock(ScalarDbDao.class);
5650
producerTaskFactory = new ProducerTaskFactory(null, false, true);
57-
when(manager.start()).thenReturn(transaction);
5851
}
5952

6053
@Test
@@ -147,7 +140,8 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
147140
startExport_givenValidDataWithoutPartitionKey_withTransaction_withStorage_shouldGenerateOutputFile()
148141
throws IOException, ScalarDbDaoException {
149142
exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory);
150-
TransactionCrudOperable.Scanner scanner = Mockito.mock(TransactionCrudOperable.Scanner.class);
143+
TransactionManagerCrudOperable.Scanner scanner =
144+
Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
151145
String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl";
152146
Map<String, Column<?>> values = UnitTestUtils.createTestValues();
153147
Result result = new ResultImpl(values, mockData);
@@ -166,7 +160,7 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
166160
exportOptions.getTableName(),
167161
exportOptions.getProjectionColumns(),
168162
exportOptions.getLimit(),
169-
transaction))
163+
manager))
170164
.thenReturn(scanner);
171165
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
172166
try (BufferedWriter writer =
@@ -186,7 +180,8 @@ void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile()
186180
@Test
187181
void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException {
188182
exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory);
189-
TransactionCrudOperable.Scanner scanner = Mockito.mock(TransactionCrudOperable.Scanner.class);
183+
TransactionManagerCrudOperable.Scanner scanner =
184+
Mockito.mock(TransactionManagerCrudOperable.Scanner.class);
190185
String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl";
191186
Map<String, Column<?>> values = UnitTestUtils.createTestValues();
192187
Result result = new ResultImpl(values, mockData);
@@ -212,7 +207,7 @@ void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() th
212207
exportOptions.getSortOrders(),
213208
exportOptions.getProjectionColumns(),
214209
exportOptions.getLimit(),
215-
transaction))
210+
manager))
216211
.thenReturn(scanner);
217212
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
218213
try (BufferedWriter writer =

0 commit comments

Comments
 (0)