From c2cfa9142678e7f332ae8b1195f2dd517abc04e4 Mon Sep 17 00:00:00 2001 From: thongdk8 Date: Wed, 28 May 2025 18:56:59 +0900 Subject: [PATCH 1/2] Refactor import processors in the data loader, and remove the unecessary usage of import restul status map --- .../core/dataimport/ImportEventListener.java | 7 -- .../core/dataimport/ImportManager.java | 29 +------- .../log/SingleFileImportLogger.java | 9 --- .../log/SplitByDataChunkImportLogger.java | 9 --- .../processor/CsvImportProcessor.java | 64 +----------------- .../dataimport/processor/ImportProcessor.java | 67 ++++++++++++++++--- .../processor/JsonImportProcessor.java | 65 +----------------- .../processor/JsonLinesImportProcessor.java | 65 +----------------- .../processor/CsvImportProcessorTest.java | 17 +++-- .../processor/JsonImportProcessorTest.java | 17 +++-- .../JsonLinesImportProcessorTest.java | 17 +++-- 11 files changed, 88 insertions(+), 278 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java index 8081931c50..35f72a372a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java @@ -18,13 +18,6 @@ public interface ImportEventListener { */ void onDataChunkStarted(ImportDataChunkStatus status); - /** - * Updates or adds new status information for a data chunk. - * - * @param status the updated status information for the data chunk - */ - void addOrUpdateDataChunkStatus(ImportDataChunkStatus status); - /** * Called when processing of a data chunk is completed. * diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java index 9edbb478f7..64d943d4ec 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java @@ -17,7 +17,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import lombok.AllArgsConstructor; import lombok.NonNull; @@ -46,8 +45,6 @@ public class ImportManager implements ImportEventListener { private final ScalarDbMode scalarDbMode; private final DistributedStorage distributedStorage; private final DistributedTransactionManager distributedTransactionManager; - private final ConcurrentHashMap importDataChunkStatusMap = - new ConcurrentHashMap<>(); /** * Starts the import process using the configured parameters. @@ -55,11 +52,8 @@ public class ImportManager implements ImportEventListener { *

If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be * processed as a single chunk. Otherwise, the file will be processed in chunks of the specified * size. - * - * @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed - * chunk */ - public ConcurrentHashMap startImport() { + public void startImport() { ImportProcessorParams params = ImportProcessorParams.builder() .scalarDbMode(scalarDbMode) @@ -77,8 +71,7 @@ public ConcurrentHashMap startImport() { importOptions.getDataChunkSize() == 0 ? Integer.MAX_VALUE : importOptions.getDataChunkSize(); - return processor.process( - dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader); + processor.process(dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader); } /** @@ -108,15 +101,6 @@ public void onDataChunkStarted(ImportDataChunkStatus status) { } } - /** - * {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is - * thread-safe. - */ - @Override - public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) { - importDataChunkStatusMap.put(status.getDataChunkId(), status); - } - /** {@inheritDoc} Forwards the event to all registered listeners. */ @Override public void onDataChunkCompleted(ImportDataChunkStatus status) { @@ -157,15 +141,6 @@ public void onAllDataChunksCompleted() { } } - /** - * Returns the current map of import data chunk status objects. - * - * @return a map of {@link ImportDataChunkStatus} objects - */ - public ConcurrentHashMap getImportDataChunkStatus() { - return importDataChunkStatusMap; - } - /** * Creates and returns a mapping of table column data types from the table metadata. * diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java index e597987aff..95d13d2983 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java @@ -70,15 +70,6 @@ public void onTaskComplete(ImportTaskResult taskResult) { } } - /** - * Called to add or update the status of a data chunk. This implementation does nothing as the - * status is only logged when the data chunk is completed. - * - * @param status the status of the data chunk - */ - @Override - public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} - /** * Called when a data chunk is completed. Logs the summary of the data chunk to the summary log * file. diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java index 31ddc5ca84..cff2d5d445 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java @@ -112,15 +112,6 @@ private void writeLog(ImportTargetResult target, LogFileType logFileType, int da writer.write(jsonNode); } - /** - * Called to add or update the status of a data chunk. This implementation does nothing as the - * status is only logged when the data chunk is completed. - * - * @param status the status of the data chunk - */ - @Override - public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} - /** * Called when a data chunk is completed. Logs the summary of the data chunk and closes the log * writers for that data chunk. diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java index 0c68d5e566..bf4d51bd49 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java @@ -5,7 +5,6 @@ import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.DataLoaderObjectMapper; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; -import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; import java.io.BufferedReader; import java.io.IOException; @@ -14,12 +13,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -50,60 +43,6 @@ public CsvImportProcessor(ImportProcessorParams params) { super(params); } - /** - * Processes the source data from the given import file. - * - *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and - * batches transactions according to the specified sizes. The method returns a list of {@link - * ImportDataChunkStatus} objects, each representing the status of a processed data chunk. - * - * @param dataChunkSize the number of records to include in each data chunk - * @param transactionBatchSize the number of records to include in each transaction batch - * @param reader the {@link BufferedReader} used to read the source file - * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each - * data chunk - */ - @Override - public ConcurrentHashMap process( - int dataChunkSize, int transactionBatchSize, BufferedReader reader) { - ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); - BlockingQueue dataChunkQueue = - new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); - - try { - CompletableFuture readerFuture = - CompletableFuture.runAsync( - () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); - - ConcurrentHashMap result = new ConcurrentHashMap<>(); - - while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { - ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); - if (dataChunk != null) { - ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize); - result.put(status.getDataChunkId(), status); - } - } - readerFuture.join(); - return result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); - } finally { - dataChunkExecutor.shutdown(); - try { - if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - dataChunkExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - dataChunkExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - notifyAllDataChunksCompleted(); - } - } - /** * Reads and processes CSV data in chunks from the provided reader. * @@ -122,7 +61,8 @@ public ConcurrentHashMap process( * @param dataChunkQueue the queue where data chunks are placed for processing * @throws RuntimeException if there are errors reading the file or if interrupted */ - private void readDataChunks( + @Override + protected void readDataChunks( BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) { try { String delimiter = diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 8da8850430..a98db1145a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataimport.processor; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataimport.ImportEventListener; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; @@ -22,11 +23,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; @@ -56,11 +60,57 @@ public abstract class ImportProcessor { * @param transactionBatchSize the number of records to group together in a single transaction * (only used in transaction mode) * @param reader the {@link BufferedReader} used to read the source file - * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status and - * results of each data chunk */ - public abstract ConcurrentHashMap process( - int dataChunkSize, int transactionBatchSize, BufferedReader reader); + public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) { + ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); + BlockingQueue dataChunkQueue = + new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); + + try { + CompletableFuture readerFuture = + CompletableFuture.runAsync( + () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); + + while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { + ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); + if (dataChunk != null) { + processDataChunk(dataChunk, transactionBatchSize); + } + } + + readerFuture.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); + } finally { + dataChunkExecutor.shutdown(); + try { + if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + dataChunkExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + dataChunkExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + notifyAllDataChunksCompleted(); + } + } + + /** + * Reads and processes data in chunks from the provided reader. + * + *

This method should be implemented by each processor to handle the specific format of the + * input data. It reads data from the reader, converts it to the appropriate format, and enqueues + * it for processing. + * + * @param reader the BufferedReader containing the data + * @param dataChunkSize the number of rows to include in each chunk + * @param dataChunkQueue the queue where data chunks are placed for processing + * @throws RuntimeException if there are errors reading the file or if interrupted + */ + protected abstract void readDataChunks( + BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue); /** * Add import event listener to listener list @@ -100,7 +150,6 @@ protected void notifyStorageRecordCompleted(ImportTaskResult result) { protected void notifyDataChunkStarted(ImportDataChunkStatus status) { for (ImportEventListener listener : listeners) { listener.onDataChunkStarted(status); - listener.addOrUpdateDataChunkStatus(status); } } @@ -112,7 +161,6 @@ protected void notifyDataChunkStarted(ImportDataChunkStatus status) { protected void notifyDataChunkCompleted(ImportDataChunkStatus status) { for (ImportEventListener listener : listeners) { listener.onDataChunkCompleted(status); - listener.addOrUpdateDataChunkStatus(status); } } @@ -294,10 +342,8 @@ private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportR * * @param dataChunk the data chunk to process * @param transactionBatchSize the size of transaction batches (used only in transaction mode) - * @return an {@link ImportDataChunkStatus} containing the complete processing results and metrics */ - protected ImportDataChunkStatus processDataChunk( - ImportDataChunk dataChunk, int transactionBatchSize) { + private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSize) { ImportDataChunkStatus status = ImportDataChunkStatus.builder() .dataChunkId(dataChunk.getDataChunkId()) @@ -312,7 +358,6 @@ protected ImportDataChunkStatus processDataChunk( importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk); } notifyDataChunkCompleted(importDataChunkStatus); - return importDataChunkStatus; } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java index 733a5afa96..44b26d1db6 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessor.java @@ -7,19 +7,12 @@ import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.DataLoaderObjectMapper; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; -import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; import java.io.BufferedReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -49,61 +42,6 @@ public JsonImportProcessor(ImportProcessorParams params) { super(params); } - /** - * Processes the source data from the given import file. - * - *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and - * batches transactions according to the specified sizes. The method returns a list of {@link - * ImportDataChunkStatus} objects, each representing the status of a processed data chunk. - * - * @param dataChunkSize the number of records to include in each data chunk - * @param transactionBatchSize the number of records to include in each transaction batch - * @param reader the {@link BufferedReader} used to read the source file - * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each - * data chunk - */ - @Override - public ConcurrentHashMap process( - int dataChunkSize, int transactionBatchSize, BufferedReader reader) { - ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); - BlockingQueue dataChunkQueue = - new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); - - try { - CompletableFuture readerFuture = - CompletableFuture.runAsync( - () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); - - ConcurrentHashMap result = new ConcurrentHashMap<>(); - - while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { - ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); - if (dataChunk != null) { - ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize); - result.put(status.getDataChunkId(), status); - } - } - - readerFuture.join(); - return result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); - } finally { - dataChunkExecutor.shutdown(); - try { - if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - dataChunkExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - dataChunkExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - notifyAllDataChunksCompleted(); - } - } - /** * Reads data chunks from the JSON file and adds them to the processing queue. * @@ -119,7 +57,8 @@ public ConcurrentHashMap process( * @throws RuntimeException if there is an error reading the JSON file or if the thread is * interrupted */ - private void readDataChunks( + @Override + protected void readDataChunks( BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) { try (JsonParser jsonParser = new JsonFactory().createParser(reader)) { if (jsonParser.nextToken() != JsonToken.START_ARRAY) { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java index a121a106a5..7d7ab8777f 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessor.java @@ -4,19 +4,12 @@ import com.scalar.db.common.error.CoreError; import com.scalar.db.dataloader.core.DataLoaderObjectMapper; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk; -import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow; import java.io.BufferedReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -48,61 +41,6 @@ public JsonLinesImportProcessor(ImportProcessorParams params) { super(params); } - /** - * Processes the source data from the given import file. - * - *

This method reads data from the provided {@link BufferedReader}, processes it in chunks, and - * batches transactions according to the specified sizes. The method returns a list of {@link - * ImportDataChunkStatus} objects, each representing the status of a processed data chunk. - * - * @param dataChunkSize the number of records to include in each data chunk - * @param transactionBatchSize the number of records to include in each transaction batch - * @param reader the {@link BufferedReader} used to read the source file - * @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each - * data chunk - */ - @Override - public ConcurrentHashMap process( - int dataChunkSize, int transactionBatchSize, BufferedReader reader) { - ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); - BlockingQueue dataChunkQueue = - new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); - - try { - CompletableFuture readerFuture = - CompletableFuture.runAsync( - () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); - - ConcurrentHashMap result = new ConcurrentHashMap<>(); - - while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { - ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); - if (dataChunk != null) { - ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize); - result.put(status.getDataChunkId(), status); - } - } - - readerFuture.join(); - return result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); - } finally { - dataChunkExecutor.shutdown(); - try { - if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - dataChunkExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - dataChunkExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - notifyAllDataChunksCompleted(); - } - } - /** * Reads data from the input file and creates data chunks for processing. * @@ -115,7 +53,8 @@ public ConcurrentHashMap process( * @param dataChunkQueue the queue where data chunks are placed for processing * @throws RuntimeException if there is an error reading the file or if the thread is interrupted */ - private void readDataChunks( + @Override + protected void readDataChunks( BufferedReader reader, int dataChunkSize, BlockingQueue dataChunkQueue) { try { List currentDataChunk = new ArrayList<>(); diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index ff57e42bac..5457bc51d1 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -15,7 +15,6 @@ import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; -import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.exception.transaction.TransactionException; import java.util.HashMap; @@ -91,10 +90,10 @@ void test_importProcessWithStorage() { .tableMetadataByTableName(tableMetadataByTableName) .build(); csvImportProcessor = new CsvImportProcessor(params); - Map statusList = - csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - assert statusList != null; - Assertions.assertEquals(1, statusList.size()); + Assertions.assertDoesNotThrow( + () -> { + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + }); } @Test @@ -110,9 +109,9 @@ void test_importProcessWithTransaction() { .tableMetadataByTableName(tableMetadataByTableName) .build(); csvImportProcessor = new CsvImportProcessor(params); - Map statusList = - csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - assert statusList != null; - Assertions.assertEquals(1, statusList.size()); + Assertions.assertDoesNotThrow( + () -> { + csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); + }); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index 44c57874c2..a5705d3684 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -15,7 +15,6 @@ import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; -import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.exception.transaction.TransactionException; import java.util.HashMap; @@ -91,10 +90,10 @@ void test_importProcessWithStorage() { .tableMetadataByTableName(tableMetadataByTableName) .build(); jsonImportProcessor = new JsonImportProcessor(params); - Map statusList = - jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - assert statusList != null; - Assertions.assertEquals(1, statusList.size()); + Assertions.assertDoesNotThrow( + () -> { + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + }); } @Test @@ -110,9 +109,9 @@ void test_importProcessWithTransaction() { .tableMetadataByTableName(tableMetadataByTableName) .build(); jsonImportProcessor = new JsonImportProcessor(params); - Map statusList = - jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - assert statusList != null; - Assertions.assertEquals(1, statusList.size()); + Assertions.assertDoesNotThrow( + () -> { + jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); + }); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index 4c0e755aac..30992f1d35 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -15,7 +15,6 @@ import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; -import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; import com.scalar.db.dataloader.core.dataimport.log.LogMode; import com.scalar.db.exception.transaction.TransactionException; import java.util.HashMap; @@ -91,10 +90,10 @@ void test_importProcessWithStorage() { .tableMetadataByTableName(tableMetadataByTableName) .build(); jsonLinesImportProcessor = new JsonLinesImportProcessor(params); - Map statusList = - jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - assert statusList != null; - Assertions.assertEquals(1, statusList.size()); + Assertions.assertDoesNotThrow( + () -> { + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + }); } @Test @@ -110,9 +109,9 @@ void test_importProcessWithTransaction() { .tableMetadataByTableName(tableMetadataByTableName) .build(); jsonLinesImportProcessor = new JsonLinesImportProcessor(params); - Map statusList = - jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - assert statusList != null; - Assertions.assertEquals(1, statusList.size()); + Assertions.assertDoesNotThrow( + () -> { + jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); + }); } } From f0ce024ef2c629bb1b86a8b19ace51ee9e848b30 Mon Sep 17 00:00:00 2001 From: thongdk8 Date: Thu, 29 May 2025 10:58:37 +0900 Subject: [PATCH 2/2] Passing datachunk id only instead of datachunk --- .../dataimport/processor/ImportProcessor.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index a98db1145a..50877873f2 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -226,16 +226,16 @@ private List splitIntoTransactionBatches( * all records in the batch, and commits or aborts the transaction based on the success of all * operations. * - * @param dataChunk the parent data chunk containing this batch + * @param dataChunkId the parent data chunk id of the chunk containing this batch * @param transactionBatch the batch of records to process in a single transaction * @return an {@link ImportTransactionBatchResult} containing the processing results and any * errors */ private ImportTransactionBatchResult processTransactionBatch( - ImportDataChunk dataChunk, ImportTransactionBatch transactionBatch) { + int dataChunkId, ImportTransactionBatch transactionBatch) { ImportTransactionBatchStatus status = ImportTransactionBatchStatus.builder() - .dataChunkId(dataChunk.getDataChunkId()) + .dataChunkId(dataChunkId) .transactionBatchId(transactionBatch.getTransactionBatchId()) .build(); notifyTransactionBatchStarted(status); @@ -252,7 +252,7 @@ private ImportTransactionBatchResult processTransactionBatch( ImportTaskParams taskParams = ImportTaskParams.builder() .sourceRecord(importRow.getSourceData()) - .dataChunkId(dataChunk.getDataChunkId()) + .dataChunkId(dataChunkId) .rowNumber(importRow.getRowNumber()) .importOptions(params.getImportOptions()) .tableColumnDataTypes(params.getTableColumnDataTypes()) @@ -295,7 +295,7 @@ private ImportTransactionBatchResult processTransactionBatch( ImportTransactionBatchResult.builder() .transactionBatchId(transactionBatch.getTransactionBatchId()) .success(isSuccess) - .dataChunkId(dataChunk.getDataChunkId()) + .dataChunkId(dataChunkId) .records(importRecordResult) .errors(Collections.singletonList(error)) .build(); @@ -307,15 +307,15 @@ private ImportTransactionBatchResult processTransactionBatch( * Processes a single record in storage mode (non-transactional). Each record is processed * independently without transaction guarantees. * - * @param dataChunk the parent data chunk containing this record + * @param dataChunkId the parent data chunk id of the chunk containing this record * @param importRow the record to process * @return an {@link ImportTaskResult} containing the processing result for the record */ - private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportRow importRow) { + private ImportTaskResult processStorageRecord(int dataChunkId, ImportRow importRow) { ImportTaskParams taskParams = ImportTaskParams.builder() .sourceRecord(importRow.getSourceData()) - .dataChunkId(dataChunk.getDataChunkId()) + .dataChunkId(dataChunkId) .rowNumber(importRow.getRowNumber()) .importOptions(params.getImportOptions()) .tableColumnDataTypes(params.getTableColumnDataTypes()) @@ -330,7 +330,7 @@ private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportR .rowNumber(importRecordResult.getRowNumber()) .rawRecord(importRecordResult.getRawRecord()) .targets(importRecordResult.getTargets()) - .dataChunkId(dataChunk.getDataChunkId()) + .dataChunkId(dataChunkId) .build(); notifyStorageRecordCompleted(modifiedTaskResult); return modifiedTaskResult; @@ -382,7 +382,7 @@ private ImportDataChunkStatus processDataChunkWithTransactions( for (ImportTransactionBatch transactionBatch : transactionBatches) { Future transactionBatchFuture = transactionBatchExecutor.submit( - () -> processTransactionBatch(dataChunk, transactionBatch)); + () -> processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch)); transactionBatchFutures.add(transactionBatchFuture); } @@ -446,7 +446,8 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun try { for (ImportRow importRow : dataChunk.getSourceData()) { Future recordFuture = - recordExecutor.submit(() -> processStorageRecord(dataChunk, importRow)); + recordExecutor.submit( + () -> processStorageRecord(dataChunk.getDataChunkId(), importRow)); recordFutures.add(recordFuture); } waitForFuturesToComplete(recordFutures);