Skip to content

Commit c2cfa91

Browse files
committed
Refactor import processors in the data loader, and remove the unecessary usage of import restul status map
1 parent 95b8379 commit c2cfa91

File tree

11 files changed

+88
-278
lines changed

11 files changed

+88
-278
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportEventListener.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,6 @@ public interface ImportEventListener {
1818
*/
1919
void onDataChunkStarted(ImportDataChunkStatus status);
2020

21-
/**
22-
* Updates or adds new status information for a data chunk.
23-
*
24-
* @param status the updated status information for the data chunk
25-
*/
26-
void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);
27-
2821
/**
2922
* Called when processing of a data chunk is completed.
3023
*

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.ArrayList;
1818
import java.util.List;
1919
import java.util.Map;
20-
import java.util.concurrent.ConcurrentHashMap;
2120
import lombok.AllArgsConstructor;
2221
import lombok.NonNull;
2322

@@ -46,20 +45,15 @@ public class ImportManager implements ImportEventListener {
4645
private final ScalarDbMode scalarDbMode;
4746
private final DistributedStorage distributedStorage;
4847
private final DistributedTransactionManager distributedTransactionManager;
49-
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
50-
new ConcurrentHashMap<>();
5148

5249
/**
5350
* Starts the import process using the configured parameters.
5451
*
5552
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
5653
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
5754
* size.
58-
*
59-
* @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
60-
* chunk
6155
*/
62-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
56+
public void startImport() {
6357
ImportProcessorParams params =
6458
ImportProcessorParams.builder()
6559
.scalarDbMode(scalarDbMode)
@@ -77,8 +71,7 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
7771
importOptions.getDataChunkSize() == 0
7872
? Integer.MAX_VALUE
7973
: importOptions.getDataChunkSize();
80-
return processor.process(
81-
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
74+
processor.process(dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
8275
}
8376

8477
/**
@@ -108,15 +101,6 @@ public void onDataChunkStarted(ImportDataChunkStatus status) {
108101
}
109102
}
110103

111-
/**
112-
* {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
113-
* thread-safe.
114-
*/
115-
@Override
116-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
117-
importDataChunkStatusMap.put(status.getDataChunkId(), status);
118-
}
119-
120104
/** {@inheritDoc} Forwards the event to all registered listeners. */
121105
@Override
122106
public void onDataChunkCompleted(ImportDataChunkStatus status) {
@@ -157,15 +141,6 @@ public void onAllDataChunksCompleted() {
157141
}
158142
}
159143

160-
/**
161-
* Returns the current map of import data chunk status objects.
162-
*
163-
* @return a map of {@link ImportDataChunkStatus} objects
164-
*/
165-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> getImportDataChunkStatus() {
166-
return importDataChunkStatusMap;
167-
}
168-
169144
/**
170145
* Creates and returns a mapping of table column data types from the table metadata.
171146
*

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,6 @@ public void onTaskComplete(ImportTaskResult taskResult) {
7070
}
7171
}
7272

73-
/**
74-
* Called to add or update the status of a data chunk. This implementation does nothing as the
75-
* status is only logged when the data chunk is completed.
76-
*
77-
* @param status the status of the data chunk
78-
*/
79-
@Override
80-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
81-
8273
/**
8374
* Called when a data chunk is completed. Logs the summary of the data chunk to the summary log
8475
* file.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,6 @@ private void writeLog(ImportTargetResult target, LogFileType logFileType, int da
112112
writer.write(jsonNode);
113113
}
114114

115-
/**
116-
* Called to add or update the status of a data chunk. This implementation does nothing as the
117-
* status is only logged when the data chunk is completed.
118-
*
119-
* @param status the status of the data chunk
120-
*/
121-
@Override
122-
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
123-
124115
/**
125116
* Called when a data chunk is completed. Logs the summary of the data chunk and closes the log
126117
* writers for that data chunk.

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessor.java

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.scalar.db.common.error.CoreError;
66
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
77
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
8-
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
98
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
109
import java.io.BufferedReader;
1110
import java.io.IOException;
@@ -14,12 +13,6 @@
1413
import java.util.List;
1514
import java.util.Optional;
1615
import java.util.concurrent.BlockingQueue;
17-
import java.util.concurrent.CompletableFuture;
18-
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.ExecutorService;
20-
import java.util.concurrent.Executors;
21-
import java.util.concurrent.LinkedBlockingQueue;
22-
import java.util.concurrent.TimeUnit;
2316
import java.util.concurrent.atomic.AtomicInteger;
2417

2518
/**
@@ -50,60 +43,6 @@ public CsvImportProcessor(ImportProcessorParams params) {
5043
super(params);
5144
}
5245

53-
/**
54-
* Processes the source data from the given import file.
55-
*
56-
* <p>This method reads data from the provided {@link BufferedReader}, processes it in chunks, and
57-
* batches transactions according to the specified sizes. The method returns a list of {@link
58-
* ImportDataChunkStatus} objects, each representing the status of a processed data chunk.
59-
*
60-
* @param dataChunkSize the number of records to include in each data chunk
61-
* @param transactionBatchSize the number of records to include in each transaction batch
62-
* @param reader the {@link BufferedReader} used to read the source file
63-
* @return a map of {@link ImportDataChunkStatus} objects indicating the processing status of each
64-
* data chunk
65-
*/
66-
@Override
67-
public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
68-
int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
69-
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
70-
BlockingQueue<ImportDataChunk> dataChunkQueue =
71-
new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
72-
73-
try {
74-
CompletableFuture<Void> readerFuture =
75-
CompletableFuture.runAsync(
76-
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
77-
78-
ConcurrentHashMap<Integer, ImportDataChunkStatus> result = new ConcurrentHashMap<>();
79-
80-
while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
81-
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
82-
if (dataChunk != null) {
83-
ImportDataChunkStatus status = processDataChunk(dataChunk, transactionBatchSize);
84-
result.put(status.getDataChunkId(), status);
85-
}
86-
}
87-
readerFuture.join();
88-
return result;
89-
} catch (InterruptedException e) {
90-
Thread.currentThread().interrupt();
91-
throw new RuntimeException(
92-
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
93-
} finally {
94-
dataChunkExecutor.shutdown();
95-
try {
96-
if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
97-
dataChunkExecutor.shutdownNow();
98-
}
99-
} catch (InterruptedException e) {
100-
dataChunkExecutor.shutdownNow();
101-
Thread.currentThread().interrupt();
102-
}
103-
notifyAllDataChunksCompleted();
104-
}
105-
}
106-
10746
/**
10847
* Reads and processes CSV data in chunks from the provided reader.
10948
*
@@ -122,7 +61,8 @@ public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
12261
* @param dataChunkQueue the queue where data chunks are placed for processing
12362
* @throws RuntimeException if there are errors reading the file or if interrupted
12463
*/
125-
private void readDataChunks(
64+
@Override
65+
protected void readDataChunks(
12666
BufferedReader reader, int dataChunkSize, BlockingQueue<ImportDataChunk> dataChunkQueue) {
12767
try {
12868
String delimiter =

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.scalar.db.dataloader.core.dataimport.processor;
22

33
import com.scalar.db.api.DistributedTransaction;
4+
import com.scalar.db.common.error.CoreError;
45
import com.scalar.db.dataloader.core.ScalarDbMode;
56
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
67
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
@@ -22,11 +23,14 @@
2223
import java.util.ArrayList;
2324
import java.util.Collections;
2425
import java.util.List;
25-
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.CompletableFuture;
2628
import java.util.concurrent.ExecutionException;
2729
import java.util.concurrent.ExecutorService;
2830
import java.util.concurrent.Executors;
2931
import java.util.concurrent.Future;
32+
import java.util.concurrent.LinkedBlockingQueue;
33+
import java.util.concurrent.TimeUnit;
3034
import java.util.concurrent.atomic.AtomicInteger;
3135
import lombok.RequiredArgsConstructor;
3236
import org.slf4j.Logger;
@@ -56,11 +60,57 @@ public abstract class ImportProcessor {
5660
* @param transactionBatchSize the number of records to group together in a single transaction
5761
* (only used in transaction mode)
5862
* @param reader the {@link BufferedReader} used to read the source file
59-
* @return a map of {@link ImportDataChunkStatus} objects indicating the processing status and
60-
* results of each data chunk
6163
*/
62-
public abstract ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
63-
int dataChunkSize, int transactionBatchSize, BufferedReader reader);
64+
public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
65+
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
66+
BlockingQueue<ImportDataChunk> dataChunkQueue =
67+
new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
68+
69+
try {
70+
CompletableFuture<Void> readerFuture =
71+
CompletableFuture.runAsync(
72+
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
73+
74+
while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
75+
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
76+
if (dataChunk != null) {
77+
processDataChunk(dataChunk, transactionBatchSize);
78+
}
79+
}
80+
81+
readerFuture.join();
82+
} catch (InterruptedException e) {
83+
Thread.currentThread().interrupt();
84+
throw new RuntimeException(
85+
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
86+
} finally {
87+
dataChunkExecutor.shutdown();
88+
try {
89+
if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
90+
dataChunkExecutor.shutdownNow();
91+
}
92+
} catch (InterruptedException e) {
93+
dataChunkExecutor.shutdownNow();
94+
Thread.currentThread().interrupt();
95+
}
96+
notifyAllDataChunksCompleted();
97+
}
98+
}
99+
100+
/**
101+
* Reads and processes data in chunks from the provided reader.
102+
*
103+
* <p>This method should be implemented by each processor to handle the specific format of the
104+
* input data. It reads data from the reader, converts it to the appropriate format, and enqueues
105+
* it for processing.
106+
*
107+
* @param reader the BufferedReader containing the data
108+
* @param dataChunkSize the number of rows to include in each chunk
109+
* @param dataChunkQueue the queue where data chunks are placed for processing
110+
* @throws RuntimeException if there are errors reading the file or if interrupted
111+
*/
112+
protected abstract void readDataChunks(
113+
BufferedReader reader, int dataChunkSize, BlockingQueue<ImportDataChunk> dataChunkQueue);
64114

65115
/**
66116
* Add import event listener to listener list
@@ -100,7 +150,6 @@ protected void notifyStorageRecordCompleted(ImportTaskResult result) {
100150
protected void notifyDataChunkStarted(ImportDataChunkStatus status) {
101151
for (ImportEventListener listener : listeners) {
102152
listener.onDataChunkStarted(status);
103-
listener.addOrUpdateDataChunkStatus(status);
104153
}
105154
}
106155

@@ -112,7 +161,6 @@ protected void notifyDataChunkStarted(ImportDataChunkStatus status) {
112161
protected void notifyDataChunkCompleted(ImportDataChunkStatus status) {
113162
for (ImportEventListener listener : listeners) {
114163
listener.onDataChunkCompleted(status);
115-
listener.addOrUpdateDataChunkStatus(status);
116164
}
117165
}
118166

@@ -294,10 +342,8 @@ private ImportTaskResult processStorageRecord(ImportDataChunk dataChunk, ImportR
294342
*
295343
* @param dataChunk the data chunk to process
296344
* @param transactionBatchSize the size of transaction batches (used only in transaction mode)
297-
* @return an {@link ImportDataChunkStatus} containing the complete processing results and metrics
298345
*/
299-
protected ImportDataChunkStatus processDataChunk(
300-
ImportDataChunk dataChunk, int transactionBatchSize) {
346+
private void processDataChunk(ImportDataChunk dataChunk, int transactionBatchSize) {
301347
ImportDataChunkStatus status =
302348
ImportDataChunkStatus.builder()
303349
.dataChunkId(dataChunk.getDataChunkId())
@@ -312,7 +358,6 @@ protected ImportDataChunkStatus processDataChunk(
312358
importDataChunkStatus = processDataChunkWithoutTransactions(dataChunk);
313359
}
314360
notifyDataChunkCompleted(importDataChunkStatus);
315-
return importDataChunkStatus;
316361
}
317362

318363
/**

0 commit comments

Comments
 (0)