66import com .scalar .db .api .Result ;
77import com .scalar .db .api .Scanner ;
88import com .scalar .db .api .TableMetadata ;
9+ import com .scalar .db .api .TransactionCrudOperable ;
910import com .scalar .db .dataloader .core .FileFormat ;
1011import com .scalar .db .dataloader .core .ScalarDbMode ;
1112import com .scalar .db .dataloader .core .dataexport .producer .ProducerTask ;
@@ -92,60 +93,141 @@ abstract void processFooter(
9293 public ExportReport startExport (
9394 ExportOptions exportOptions , TableMetadata tableMetadata , Writer writer ) {
9495 ExportReport exportReport = new ExportReport ();
96+ ExecutorService executorService = null ;
97+
9598 try {
9699 validateExportOptions (exportOptions , tableMetadata );
97- Map <String , DataType > dataTypeByColumnName = tableMetadata .getColumnDataTypes ();
98100 handleTransactionMetadata (exportOptions , tableMetadata );
99- processHeader (exportOptions , tableMetadata , writer );
100101
101- int maxThreadCount =
102- exportOptions .getMaxThreadCount () == 0
103- ? Runtime .getRuntime ().availableProcessors ()
104- : exportOptions .getMaxThreadCount ();
105- ExecutorService executorService = Executors .newFixedThreadPool (maxThreadCount );
102+ try (BufferedWriter bufferedWriter = new BufferedWriter (writer )) {
103+ processHeader (exportOptions , tableMetadata , bufferedWriter );
106104
107- BufferedWriter bufferedWriter = new BufferedWriter (writer );
108- boolean isJson = exportOptions .getOutputFileFormat () == FileFormat .JSON ;
105+ int threadCount =
106+ exportOptions .getMaxThreadCount () > 0
107+ ? exportOptions .getMaxThreadCount ()
108+ : Runtime .getRuntime ().availableProcessors ();
109+ executorService = Executors .newFixedThreadPool (threadCount );
109110
110- try (Scanner scanner =
111- createScanner (exportOptions , dao , distributedStorage , distributedTransactionManager )) {
112- Iterator <Result > iterator = scanner .iterator ();
113111 AtomicBoolean isFirstBatch = new AtomicBoolean (true );
112+ Map <String , DataType > dataTypeByColumnName = tableMetadata .getColumnDataTypes ();
114113
115- while (iterator .hasNext ()) {
116- List <Result > dataChunk = fetchDataChunk (iterator , exportOptions .getDataChunkSize ());
117- executorService .submit (
118- () ->
119- processDataChunk (
120- exportOptions ,
121- tableMetadata ,
122- dataTypeByColumnName ,
123- dataChunk ,
124- bufferedWriter ,
125- isJson ,
126- isFirstBatch ,
127- exportReport ));
128- }
129- executorService .shutdown ();
130- if (executorService .awaitTermination (Long .MAX_VALUE , TimeUnit .NANOSECONDS )) {
131- logger .info ("All tasks completed" );
132- } else {
133- logger .error ("Timeout occurred while waiting for tasks to complete" );
134- // TODO: handle this
114+ if (exportOptions .getScalarDbMode () == ScalarDbMode .STORAGE ) {
115+ try (Scanner scanner = createScannerWithStorage (exportOptions , dao , distributedStorage )) {
116+ submitTasks (
117+ scanner .iterator (),
118+ executorService ,
119+ exportOptions ,
120+ tableMetadata ,
121+ dataTypeByColumnName ,
122+ bufferedWriter ,
123+ isFirstBatch ,
124+ exportReport );
125+ }
126+ } else if (exportOptions .getScalarDbMode () == ScalarDbMode .TRANSACTION
127+ && distributedTransactionManager != null ) {
128+ ScannerWithTransaction scannerWithTx =
129+ createScannerWithTransaction (exportOptions , dao , distributedTransactionManager );
130+
131+ try (TransactionCrudOperable .Scanner scanner = scannerWithTx .getScanner ()) {
132+ submitTasks (
133+ scanner .iterator (),
134+ executorService ,
135+ exportOptions ,
136+ tableMetadata ,
137+ dataTypeByColumnName ,
138+ bufferedWriter ,
139+ isFirstBatch ,
140+ exportReport );
141+ } finally {
142+ scannerWithTx .getTransaction ().commit ();
143+ }
135144 }
145+
146+ shutdownExecutor (executorService );
136147 processFooter (exportOptions , tableMetadata , bufferedWriter );
137- } catch (InterruptedException | IOException | TransactionException e ) {
138- logger .error ("Error during export: {}" , e .getMessage ());
139- } finally {
140- bufferedWriter .flush ();
141148 }
142- } catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e ) {
143- logger .error ("Error during export: {}" , e .getMessage ());
149+
150+ } catch (Exception e ) {
151+ logger .error ("Export failed" , e );
152+ } finally {
153+ if (executorService != null && !executorService .isShutdown ()) {
154+ executorService .shutdownNow ();
155+ }
156+ closeResources ();
144157 }
145- closeResources ();
158+
146159 return exportReport ;
147160 }
148161
162+ /**
163+ * Submits asynchronous tasks for processing chunks of data to the given executor service.
164+ *
165+ * <p>This method reads data from the provided {@code iterator} in chunks (based on the configured
166+ * chunk size) and submits each chunk as a separate task for processing. Each task invokes {@code
167+ * processDataChunk()} to write the data to the output format.
168+ *
169+ * <p>Any exceptions thrown during chunk processing are logged but do not halt the submission of
170+ * other tasks.
171+ *
172+ * @param iterator the iterator over database results
173+ * @param executorService the executor service to run the processing tasks
174+ * @param exportOptions configuration for export operation
175+ * @param tableMetadata metadata for the table being exported
176+ * @param dataTypeByColumnName mapping of column names to their data types
177+ * @param writer the writer to which export output is written
178+ * @param isFirstBatch an atomic flag used to track if the current chunk is the first one (used
179+ * for formatting)
180+ * @param exportReport the report object that accumulates export statistics
181+ */
182+ private void submitTasks (
183+ Iterator <Result > iterator ,
184+ ExecutorService executorService ,
185+ ExportOptions exportOptions ,
186+ TableMetadata tableMetadata ,
187+ Map <String , DataType > dataTypeByColumnName ,
188+ BufferedWriter writer ,
189+ AtomicBoolean isFirstBatch ,
190+ ExportReport exportReport ) {
191+ while (iterator .hasNext ()) {
192+ List <Result > chunk = fetchDataChunk (iterator , exportOptions .getDataChunkSize ());
193+ executorService .submit (
194+ () -> {
195+ try {
196+ processDataChunk (
197+ exportOptions ,
198+ tableMetadata ,
199+ dataTypeByColumnName ,
200+ chunk ,
201+ writer ,
202+ exportOptions .getOutputFileFormat () == FileFormat .JSON ,
203+ isFirstBatch ,
204+ exportReport );
205+ } catch (Exception e ) {
206+ logger .error ("Error processing data chunk" , e );
207+ }
208+ });
209+ }
210+ }
211+
212+ /**
213+ * Shuts down the given executor service gracefully, waiting for tasks to complete.
214+ *
215+ * <p>This method initiates an orderly shutdown where previously submitted tasks are executed, but
216+ * no new tasks will be accepted. It then waits for all tasks to finish within a specified
217+ * timeout. If the tasks do not complete in time, a warning is logged.
218+ *
219+ * @param executorService the ExecutorService to shut down
220+ * @throws InterruptedException if the current thread is interrupted while waiting
221+ */
222+ private void shutdownExecutor (ExecutorService executorService ) throws InterruptedException {
223+ executorService .shutdown ();
224+ if (!executorService .awaitTermination (60 , TimeUnit .MINUTES )) {
225+ logger .warn ("Timeout while waiting for export tasks to finish." );
226+ } else {
227+ logger .info ("All export tasks completed." );
228+ }
229+ }
230+
149231 /**
150232 * To process result data chunk
151233 *
@@ -233,35 +315,6 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat
233315 }
234316 }
235317
236- /**
237- * Creates a ScalarDB {@link Scanner} instance based on the configured ScalarDB mode.
238- *
239- * <p>If the {@link ScalarDbMode} specified in {@code exportOptions} is {@code TRANSACTION}, a
240- * scanner is created using the {@link DistributedTransactionManager}. Otherwise, a scanner is
241- * created using the {@link DistributedStorage}.
242- *
243- * @param exportOptions Options containing configuration for the export operation, including the
244- * ScalarDB mode
245- * @param dao The {@link ScalarDbDao} used to access ScalarDB
246- * @param storage The {@link DistributedStorage} instance used for storage-level operations
247- * @param transactionManager The {@link DistributedTransactionManager} instance used for
248- * transaction-level operations
249- * @return A {@link Scanner} instance for reading data from ScalarDB
250- * @throws ScalarDbDaoException If an error occurs while creating the scanner with the DAO
251- * @throws TransactionException If an error occurs during transactional scanner creation
252- */
253- private Scanner createScanner (
254- ExportOptions exportOptions ,
255- ScalarDbDao dao ,
256- DistributedStorage storage ,
257- DistributedTransactionManager transactionManager )
258- throws ScalarDbDaoException , TransactionException {
259- if (exportOptions .getScalarDbMode ().equals (ScalarDbMode .TRANSACTION )) {
260- return createScannerWithTransaction (exportOptions , dao , transactionManager );
261- }
262- return createScannerWithStorage (exportOptions , dao , storage );
263- }
264-
265318 /**
266319 * Creates a ScalarDB {@link Scanner} using the {@link DistributedStorage} interface based on the
267320 * scan configuration provided in {@link ExportOptions}.
@@ -303,23 +356,27 @@ private Scanner createScannerWithStorage(
303356 }
304357
305358 /**
306- * Creates a {@link Scanner} using a {@link DistributedTransaction} based on the provided export
307- * options .
359+ * Creates a {@link ScannerWithTransaction} object that encapsulates a transactional scanner and
360+ * its associated transaction for reading data from a ScalarDB table .
308361 *
309- * <p>If no partition key is specified in the {@code exportOptions }, a full table scan is
310- * performed. Otherwise, a partition-specific scan is performed using the provided partition key,
362+ * <p>If no partition key is provided in the {@link ExportOptions }, a full table scan is
363+ * performed. Otherwise, a partition-specific scan is created using the provided partition key,
311364 * optional scan range, and sort orders.
312365 *
313- * @param exportOptions Options specifying how to scan the table (e.g., partition key, range,
314- * projection).
315- * @param dao The ScalarDb data access object to create the scanner.
316- * @param distributedTransactionManager The transaction manager used to start a new transaction.
317- * @return A {@link Scanner} for reading data from the specified table.
318- * @throws ScalarDbDaoException If an error occurs while creating the scanner.
319- * @throws TransactionException If an error occurs during transaction management (start or
320- * commit).
366+ * <p>The method starts a new transaction using the given {@link DistributedTransactionManager},
367+ * which will be associated with the returned scanner. This allows data export operations to be
368+ * executed in a consistent transactional context.
369+ *
370+ * @param exportOptions the options specifying how to scan the table, such as namespace, table
371+ * name, projection columns, scan partition key, range, sort orders, and limit.
372+ * @param dao the {@link ScalarDbDao} used to construct the transactional scanner.
373+ * @param distributedTransactionManager the transaction manager used to start a new transaction.
374+ * @return a {@link ScannerWithTransaction} instance that wraps both the transaction and the
375+ * scanner.
376+ * @throws ScalarDbDaoException if an error occurs while creating the scanner with the DAO.
377+ * @throws TransactionException if an error occurs when starting the transaction.
321378 */
322- private Scanner createScannerWithTransaction (
379+ private ScannerWithTransaction createScannerWithTransaction (
323380 ExportOptions exportOptions ,
324381 ScalarDbDao dao ,
325382 DistributedTransactionManager distributedTransactionManager )
@@ -328,41 +385,29 @@ private Scanner createScannerWithTransaction(
328385 boolean isScanAll = exportOptions .getScanPartitionKey () == null ;
329386 DistributedTransaction transaction = distributedTransactionManager .start ();
330387
331- try {
332- Scanner scanner ;
333- if (isScanAll ) {
334- scanner =
335- dao .createScanner (
336- exportOptions .getNamespace (),
337- exportOptions .getTableName (),
338- exportOptions .getProjectionColumns (),
339- exportOptions .getLimit (),
340- transaction );
341- } else {
342- scanner =
343- dao .createScanner (
344- exportOptions .getNamespace (),
345- exportOptions .getTableName (),
346- exportOptions .getScanPartitionKey (),
347- exportOptions .getScanRange (),
348- exportOptions .getSortOrders (),
349- exportOptions .getProjectionColumns (),
350- exportOptions .getLimit (),
351- transaction );
352- }
353-
354- transaction .commit ();
355- return scanner ;
356-
357- } catch (Exception e ) {
358- try {
359- transaction .abort ();
360- } catch (TransactionException abortException ) {
361- logger .error (
362- "Failed to abort transaction: {}" , abortException .getMessage (), abortException );
363- }
364- throw e ;
388+ TransactionCrudOperable .Scanner scanner ;
389+ if (isScanAll ) {
390+ scanner =
391+ dao .createScanner (
392+ exportOptions .getNamespace (),
393+ exportOptions .getTableName (),
394+ exportOptions .getProjectionColumns (),
395+ exportOptions .getLimit (),
396+ transaction );
397+ } else {
398+ scanner =
399+ dao .createScanner (
400+ exportOptions .getNamespace (),
401+ exportOptions .getTableName (),
402+ exportOptions .getScanPartitionKey (),
403+ exportOptions .getScanRange (),
404+ exportOptions .getSortOrders (),
405+ exportOptions .getProjectionColumns (),
406+ exportOptions .getLimit (),
407+ transaction );
365408 }
409+
410+ return new ScannerWithTransaction (transaction , scanner );
366411 }
367412
368413 /** Close resources properly once the process is completed */
0 commit comments