Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ public interface ImportEventListener {
*/
void onDataChunkStarted(ImportDataChunkStatus status);

/**
* Updates or adds new status information for a data chunk.
*
* @param status the updated status information for the data chunk
*/
void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);

/**
* Called when processing of a data chunk is completed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AllArgsConstructor;
import lombok.NonNull;

Expand Down Expand Up @@ -46,20 +45,15 @@ public class ImportManager implements ImportEventListener {
private final ScalarDbMode scalarDbMode;
private final DistributedStorage distributedStorage;
private final DistributedTransactionManager distributedTransactionManager;
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
new ConcurrentHashMap<>();

/**
* Starts the import process using the configured parameters.
*
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
* size.
*
* @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
* chunk
*/
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
public void startImport() {
ImportProcessorParams params =
ImportProcessorParams.builder()
.scalarDbMode(scalarDbMode)
Expand All @@ -77,8 +71,7 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
importOptions.getDataChunkSize() == 0
? Integer.MAX_VALUE
: importOptions.getDataChunkSize();
return processor.process(
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
processor.process(dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
}

/**
Expand Down Expand Up @@ -108,15 +101,6 @@ public void onDataChunkStarted(ImportDataChunkStatus status) {
}
}

/**
* {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
* thread-safe.
*/
@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
importDataChunkStatusMap.put(status.getDataChunkId(), status);
}

/** {@inheritDoc} Forwards the event to all registered listeners. */
@Override
public void onDataChunkCompleted(ImportDataChunkStatus status) {
Expand Down Expand Up @@ -157,15 +141,6 @@ public void onAllDataChunksCompleted() {
}
}

/**
* Returns the current map of import data chunk status objects.
*
* @return a map of {@link ImportDataChunkStatus} objects
*/
public ConcurrentHashMap<Integer, ImportDataChunkStatus> getImportDataChunkStatus() {
return importDataChunkStatusMap;
}

/**
* Creates and returns a mapping of table column data types from the table metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ public void onTaskComplete(ImportTaskResult taskResult) {
}
}

/**
* Called to add or update the status of a data chunk. This implementation does nothing as the
* status is only logged when the data chunk is completed.
*
* @param status the status of the data chunk
*/
@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}

/**
* Called when a data chunk is completed. Logs the summary of the data chunk to the summary log
* file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,6 @@ private void writeLog(ImportTargetResult target, LogFileType logFileType, int da
writer.write(jsonNode);
}

/**
* Called to add or update the status of a data chunk. This implementation does nothing as the
* status is only logged when the data chunk is completed.
*
* @param status the status of the data chunk
*/
@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}

/**
* Called when a data chunk is completed. Logs the summary of the data chunk and closes the log
* writers for that data chunk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -14,12 +13,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -50,60 +43,6 @@ public CsvImportProcessor(ImportProcessorParams params) {
super(params);
}

/**
* Processes the source data from the given import file.
*
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
* batches transactions according to the specified sizes. The method returns a list of {@link
* ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
*
* @param dataChunkSize the number of records to include in each data chunk
* @param transactionBatchSize the number of records to include in each transaction batch
* @param reader the {@link BufferedReader} used to read the source file
* @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
* data chunk
*/
@Override
public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
BlockingQueue<ImportDataChunk> dataChunkQueue =
new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());

try {
CompletableFuture<Void> readerFuture =
CompletableFuture.runAsync(
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);

ConcurrentHashMap<Integer, ImportDataChunkStatus> result = new ConcurrentHashMap<>();

while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
if (dataChunk != null) {
ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
result.put(status.getDataChunkId(), status);
}
}
readerFuture.join();
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
} finally {
dataChunkExecutor.shutdown();
try {
if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
dataChunkExecutor.shutdownNow();
}
} catch (InterruptedException e) {
dataChunkExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
notifyAllDataChunksCompleted();
}
}

/**
* Reads and processes CSV data in chunks from the provided reader.
*
Expand All @@ -122,7 +61,8 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
* @param dataChunkQueue the queue where data chunks are placed for processing
* @throws RuntimeException if there are errors reading the file or if interrupted
*/
private void readDataChunks(
@Override
protected void readDataChunks(
BufferedReader reader, int dataChunkSize, BlockingQueue<ImportDataChunk> dataChunkQueue) {
try {
String delimiter =
Expand Down
Loading