Skip to content

Commit a5e6e0c

Browse files
Backport to branch(3) : Refactor import logic and remove redundant status mapping (#2725)
Co-authored-by: Pham Ba Thong <[email protected]>
1 parent 7aced6a commit a5e6e0c

File tree

11 files changed

+100
-289
lines changed

11 files changed

+100
-289
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,6 @@ public interface ImportEventListener {
1818
*/
1919
void onDataChunkStarted(ImportDataChunkStatus status);
2020

21-
/**
22-
* Updates or adds new status information for a data chunk.
23-
*
24-
* @param status the updated status information for the data chunk
25-
*/
26-
void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);
27-
2821
/**
2922
* Called when processing of a data chunk is completed.
3023
*

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.ArrayList;
1818
import java.util.List;
1919
import java.util.Map;
20-
import java.util.concurrent.ConcurrentHashMap;
2120
import lombok.AllArgsConstructor;
2221
import lombok.NonNull;
2322

@@ -46,20 +45,15 @@ public class ImportManager implements ImportEventListener {
4645
private final ScalarDbMode scalarDbMode;
4746
private final DistributedStorage distributedStorage;
4847
private final DistributedTransactionManager distributedTransactionManager;
49-
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
50-
new ConcurrentHashMap<>();
5148

5249
/**
5350
* Starts the import process using the configured parameters.
5451
*
5552
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
5653
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
5754
* size.
58-
*
59-
* @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
60-
* chunk
6155
*/
62-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
56+
public void startImport() {
6357
ImportProcessorParams params =
6458
ImportProcessorParams.builder()
6559
.scalarDbMode(scalarDbMode)
@@ -77,8 +71,7 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
7771
importOptions.getDataChunkSize() == 0
7872
? Integer.MAX_VALUE
7973
: importOptions.getDataChunkSize();
80-
return processor.process(
81-
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
74+
processor.process(dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
8275
}
8376

8477
/**
@@ -108,15 +101,6 @@ public void onDataChunkStarted(ImportDataChunkStatus status) {
108101
}
109102
}
110103

111-
/**
112-
* {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
113-
* thread-safe.
114-
*/
115-
@Override
116-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
117-
importDataChunkStatusMap.put(status.getDataChunkId(), status);
118-
}
119-
120104
/** {@inheritDoc} Forwards the event to all registered listeners. */
121105
@Override
122106
public void onDataChunkCompleted(ImportDataChunkStatus status) {
@@ -194,15 +178,6 @@ public void closeResources() {
194178
}
195179
}
196180

197-
/**
198-
* Returns the current map of import data chunk status objects.
199-
*
200-
* @return a map of {@link ImportDataChunkStatus} objects
201-
*/
202-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> getImportDataChunkStatus() {
203-
return importDataChunkStatusMap;
204-
}
205-
206181
/**
207182
* Creates and returns a mapping of table column data types from the table metadata.
208183
*

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,6 @@ public void onTaskComplete(ImportTaskResult taskResult) {
7070
}
7171
}
7272

73-
/**
74-
* Called to add or update the status of a data chunk. This implementation does nothing as the
75-
* status is only logged when the data chunk is completed.
76-
*
77-
* @param status the status of the data chunk
78-
*/
79-
@Override
80-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
81-
8273
/**
8374
* Called when a data chunk is completed. Logs the summary of the data chunk to the summary log
8475
* file.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,6 @@ private void writeLog(ImportTargetResult target, LogFileType logFileType, int da
112112
writer.write(jsonNode);
113113
}
114114

115-
/**
116-
* Called to add or update the status of a data chunk. This implementation does nothing as the
117-
* status is only logged when the data chunk is completed.
118-
*
119-
* @param status the status of the data chunk
120-
*/
121-
@Override
122-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
123-
124115
/**
125116
* Called when a data chunk is completed. Logs the summary of the data chunk and closes the log
126117
* writers for that data chunk.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.scalar.db.common.error.CoreError;
66
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
77
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
8-
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
98
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
109
import java.io.BufferedReader;
1110
import java.io.IOException;
@@ -14,12 +13,6 @@
1413
import java.util.List;
1514
import java.util.Optional;
1615
import java.util.concurrent.BlockingQueue;
17-
import java.util.concurrent.CompletableFuture;
18-
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.ExecutorService;
20-
import java.util.concurrent.Executors;
21-
import java.util.concurrent.LinkedBlockingQueue;
22-
import java.util.concurrent.TimeUnit;
2316
import java.util.concurrent.atomic.AtomicInteger;
2417

2518
/**
@@ -50,60 +43,6 @@ public CsvImportProcessor(ImportProcessorParams params) {
5043
super(params);
5144
}
5245

53-
/**
54-
* Processes the source data from the given import file.
55-
*
56-
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
57-
* batches transactions according to the specified sizes. The method returns a list of {@link
58-
* ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
59-
*
60-
* @param dataChunkSize the number of records to include in each data chunk
61-
* @param transactionBatchSize the number of records to include in each transaction batch
62-
* @param reader the {@link BufferedReader} used to read the source file
63-
* @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
64-
* data chunk
65-
*/
66-
@Override
67-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
68-
int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
69-
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
70-
BlockingQueue<ImportDataChunk> dataChunkQueue =
71-
new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
72-
73-
try {
74-
CompletableFuture<Void> readerFuture =
75-
CompletableFuture.runAsync(
76-
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
77-
78-
ConcurrentHashMap<Integer, ImportDataChunkStatus> result = new ConcurrentHashMap<>();
79-
80-
while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
81-
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
82-
if (dataChunk != null) {
83-
ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
84-
result.put(status.getDataChunkId(), status);
85-
}
86-
}
87-
readerFuture.join();
88-
return result;
89-
} catch (InterruptedException e) {
90-
Thread.currentThread().interrupt();
91-
throw new RuntimeException(
92-
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
93-
} finally {
94-
dataChunkExecutor.shutdown();
95-
try {
96-
if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
97-
dataChunkExecutor.shutdownNow();
98-
}
99-
} catch (InterruptedException e) {
100-
dataChunkExecutor.shutdownNow();
101-
Thread.currentThread().interrupt();
102-
}
103-
notifyAllDataChunksCompleted();
104-
}
105-
}
106-
10746
/**
10847
* Reads and processes CSV data in chunks from the provided reader.
10948
*
@@ -122,7 +61,8 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
12261
* @param dataChunkQueue the queue where data chunks are placed for processing
12362
* @throws RuntimeException if there are errors reading the file or if interrupted
12463
*/
125-
private void readDataChunks(
64+
@Override
65+
protected void readDataChunks(
12666
BufferedReader reader, int dataChunkSize, BlockingQueue<ImportDataChunk> dataChunkQueue) {
12767
try {
12868
String delimiter =

0 commit comments

Comments
 (0)