Skip to content

Commit e344116

Browse files
feeblefakieinv-jishnuypeckstadt
authored
Backport to branch(3.14) : Add the import process implementation for data loader (#2587)
Co-authored-by: inv-jishnu <[email protected]> Co-authored-by: Peckstadt Yves <[email protected]>
1 parent 96d4b24 commit e344116

26 files changed

+3215
-35
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,30 @@ public enum CoreError implements ScalarDbError {
776776
""),
777777
DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED(
778778
Category.USER_ERROR, "0178", "The provided file format is not supported : %s", "", ""),
779+
DATA_LOADER_COULD_NOT_FIND_PARTITION_KEY(
780+
Category.USER_ERROR, "0179", "Could not find the partition key", "", ""),
781+
DATA_LOADER_UPSERT_INSERT_MISSING_COLUMNS(
782+
Category.USER_ERROR,
783+
"0180",
784+
"The source record needs to contain all fields if the UPSERT turns into an INSERT",
785+
"",
786+
""),
787+
DATA_LOADER_DATA_ALREADY_EXISTS(Category.USER_ERROR, "0181", "Record already exists", "", ""),
788+
DATA_LOADER_DATA_NOT_FOUND(Category.USER_ERROR, "0182", "Record was not found", "", ""),
789+
DATA_LOADER_COULD_NOT_FIND_CLUSTERING_KEY(
790+
Category.USER_ERROR, "0183", "Could not find the clustering key", "", ""),
791+
DATA_LOADER_TABLE_METADATA_MISSING(
792+
Category.USER_ERROR, "0184", "No table metadata found", "", ""),
793+
DATA_LOADER_MISSING_SOURCE_FIELD(
794+
Category.USER_ERROR,
795+
"0185",
796+
"The data mapping source field '%s' for table '%s' is missing in the json data record",
797+
"",
798+
""),
799+
DATA_LOADER_CSV_DATA_MISMATCH(
800+
Category.USER_ERROR, "0186", "The CSV row: %s does not match header: %s.", "", ""),
801+
DATA_LOADER_JSON_CONTENT_START_ERROR(
802+
Category.USER_ERROR, "0187", "Expected JSON file content to be an array", "", ""),
779803

780804
//
781805
// Errors for the concurrency error category
@@ -1033,6 +1057,20 @@ public enum CoreError implements ScalarDbError {
10331057
"Something went wrong while scanning. Are you sure you are running in the correct transaction mode? Details: %s",
10341058
"",
10351059
""),
1060+
DATA_LOADER_CSV_FILE_READ_FAILED(
1061+
Category.INTERNAL_ERROR, "0049", "Failed to read CSV file. Details: %s.", "", ""),
1062+
DATA_LOADER_CSV_FILE_HEADER_READ_FAILED(
1063+
Category.INTERNAL_ERROR, "0050", "Failed to CSV read header line. Details: %s.", "", ""),
1064+
DATA_LOADER_DATA_CHUNK_PROCESS_FAILED(
1065+
Category.INTERNAL_ERROR,
1066+
"0051",
1067+
"Data chunk processing was interrupted. Details: %s",
1068+
"",
1069+
""),
1070+
DATA_LOADER_JSON_FILE_READ_FAILED(
1071+
Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""),
1072+
DATA_LOADER_JSONLINES_FILE_READ_FAILED(
1073+
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),
10361074

10371075
//
10381076
// Errors for the unknown transaction status error category
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.scalar.db.dataloader.core.dataimport;
2+
3+
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
4+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
5+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
6+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
7+
8+
/**
9+
* Listener interface for monitoring import events during the data loading process. Implementations
10+
* can use this to track progress and handle various stages of the import process.
11+
*/
12+
public interface ImportEventListener {
13+
14+
/**
15+
* Called when processing of a data chunk begins.
16+
*
17+
* @param status the current status of the data chunk being processed
18+
*/
19+
void onDataChunkStarted(ImportDataChunkStatus status);
20+
21+
/**
22+
* Updates or adds new status information for a data chunk.
23+
*
24+
* @param status the updated status information for the data chunk
25+
*/
26+
void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);
27+
28+
/**
29+
* Called when processing of a data chunk is completed.
30+
*
31+
* @param status the final status of the completed data chunk
32+
*/
33+
void onDataChunkCompleted(ImportDataChunkStatus status);
34+
35+
/**
36+
* Called when all data chunks have been processed. This indicates that the entire chunked import
37+
* process is complete.
38+
*/
39+
void onAllDataChunksCompleted();
40+
41+
/**
42+
* Called when processing of a transaction batch begins.
43+
*
44+
* @param batchStatus the initial status of the transaction batch
45+
*/
46+
void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus);
47+
48+
/**
49+
* Called when processing of a transaction batch is completed.
50+
*
51+
* @param batchResult the result of the completed transaction batch
52+
*/
53+
void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult);
54+
55+
/**
56+
* Called when an import task is completed.
57+
*
58+
* @param taskResult the result of the completed import task
59+
*/
60+
void onTaskComplete(ImportTaskResult taskResult);
61+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package com.scalar.db.dataloader.core.dataimport;
2+
3+
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.DistributedTransactionManager;
5+
import com.scalar.db.api.TableMetadata;
6+
import com.scalar.db.dataloader.core.ScalarDBMode;
7+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
8+
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
9+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
10+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
11+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorParams;
12+
import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes;
13+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
14+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
15+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
16+
import java.io.BufferedReader;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import lombok.AllArgsConstructor;
22+
import lombok.NonNull;
23+
24+
/**
25+
* Manages the data import process and coordinates event handling between the import processor and
26+
* listeners. This class implements {@link ImportEventListener} to receive events from the processor
27+
* and relay them to registered listeners.
28+
*
29+
* <p>The import process involves:
30+
*
31+
* <ul>
32+
* <li>Reading data from an input file
33+
* <li>Processing the data in configurable chunk sizes
34+
* <li>Managing database transactions in batches
35+
* <li>Notifying listeners of various import events
36+
* </ul>
37+
*/
38+
@AllArgsConstructor
39+
public class ImportManager implements ImportEventListener {
40+
41+
@NonNull private final Map<String, TableMetadata> tableMetadata;
42+
@NonNull private final BufferedReader importFileReader;
43+
@NonNull private final ImportOptions importOptions;
44+
private final ImportProcessorFactory importProcessorFactory;
45+
private final List<ImportEventListener> listeners = new ArrayList<>();
46+
private final ScalarDBMode scalarDBMode;
47+
private final DistributedStorage distributedStorage;
48+
private final DistributedTransactionManager distributedTransactionManager;
49+
private final ConcurrentHashMap<Integer, ImportDataChunkStatus> importDataChunkStatusMap =
50+
new ConcurrentHashMap<>();
51+
52+
/**
53+
* Starts the import process using the configured parameters.
54+
*
55+
* <p>If the data chunk size in {@link ImportOptions} is set to 0, the entire file will be
56+
* processed as a single chunk. Otherwise, the file will be processed in chunks of the specified
57+
* size.
58+
*
59+
* @return a map of {@link ImportDataChunkStatus} objects containing the status of each processed
60+
* chunk
61+
*/
62+
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport() {
63+
ImportProcessorParams params =
64+
ImportProcessorParams.builder()
65+
.scalarDBMode(scalarDBMode)
66+
.importOptions(importOptions)
67+
.tableMetadataByTableName(tableMetadata)
68+
.dao(new ScalarDBDao())
69+
.distributedTransactionManager(distributedTransactionManager)
70+
.distributedStorage(distributedStorage)
71+
.tableColumnDataTypes(getTableColumnDataTypes())
72+
.build();
73+
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
74+
processor.addListener(this);
75+
// If the data chunk size is 0, then process the entire file in a single data chunk
76+
int dataChunkSize =
77+
importOptions.getDataChunkSize() == 0
78+
? Integer.MAX_VALUE
79+
: importOptions.getDataChunkSize();
80+
return processor.process(
81+
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
82+
}
83+
84+
/**
85+
* Registers a new listener to receive import events.
86+
*
87+
* @param listener the listener to add
88+
* @throws IllegalArgumentException if the listener is null
89+
*/
90+
public void addListener(ImportEventListener listener) {
91+
listeners.add(listener);
92+
}
93+
94+
/**
95+
* Removes a previously registered listener.
96+
*
97+
* @param listener the listener to remove
98+
*/
99+
public void removeListener(ImportEventListener listener) {
100+
listeners.remove(listener);
101+
}
102+
103+
/** {@inheritDoc} Forwards the event to all registered listeners. */
104+
@Override
105+
public void onDataChunkStarted(ImportDataChunkStatus status) {
106+
for (ImportEventListener listener : listeners) {
107+
listener.onDataChunkStarted(status);
108+
}
109+
}
110+
111+
/**
112+
* {@inheritDoc} Updates or adds the status of a data chunk in the status map. This method is
113+
* thread-safe.
114+
*/
115+
@Override
116+
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
117+
importDataChunkStatusMap.put(status.getDataChunkId(), status);
118+
}
119+
120+
/** {@inheritDoc} Forwards the event to all registered listeners. */
121+
@Override
122+
public void onDataChunkCompleted(ImportDataChunkStatus status) {
123+
for (ImportEventListener listener : listeners) {
124+
listener.onDataChunkCompleted(status);
125+
}
126+
}
127+
128+
/** {@inheritDoc} Forwards the event to all registered listeners. */
129+
@Override
130+
public void onTransactionBatchStarted(ImportTransactionBatchStatus status) {
131+
for (ImportEventListener listener : listeners) {
132+
listener.onTransactionBatchStarted(status);
133+
}
134+
}
135+
136+
/** {@inheritDoc} Forwards the event to all registered listeners. */
137+
@Override
138+
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
139+
for (ImportEventListener listener : listeners) {
140+
listener.onTransactionBatchCompleted(batchResult);
141+
}
142+
}
143+
144+
/** {@inheritDoc} Forwards the event to all registered listeners. */
145+
@Override
146+
public void onTaskComplete(ImportTaskResult taskResult) {
147+
for (ImportEventListener listener : listeners) {
148+
listener.onTaskComplete(taskResult);
149+
}
150+
}
151+
152+
/** {@inheritDoc} Forwards the event to all registered listeners. */
153+
@Override
154+
public void onAllDataChunksCompleted() {
155+
for (ImportEventListener listener : listeners) {
156+
listener.onAllDataChunksCompleted();
157+
}
158+
}
159+
160+
/**
161+
* Returns the current map of import data chunk status objects.
162+
*
163+
* @return a map of {@link ImportDataChunkStatus} objects
164+
*/
165+
public ConcurrentHashMap<Integer, ImportDataChunkStatus> getImportDataChunkStatus() {
166+
return importDataChunkStatusMap;
167+
}
168+
169+
/**
170+
* Creates and returns a mapping of table column data types from the table metadata.
171+
*
172+
* @return a {@link TableColumnDataTypes} object containing the column data types for all tables
173+
*/
174+
public TableColumnDataTypes getTableColumnDataTypes() {
175+
TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes();
176+
tableMetadata.forEach(
177+
(name, metadata) ->
178+
metadata
179+
.getColumnDataTypes()
180+
.forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v)));
181+
return tableColumnDataTypes;
182+
}
183+
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ public class ImportOptions {
3535
private final String tableName;
3636
private final int maxThreads;
3737
private final String customHeaderRow;
38+
private final int dataChunkQueueSize;
3839
}

0 commit comments

Comments
 (0)