Skip to content

Commit 113bea1

Browse files
Backport to branch(3.11) : Add dtos and other classes for task (#2534)
Co-authored-by: inv-jishnu <[email protected]>
1 parent 8c1f42c commit 113bea1

File tree

8 files changed

+379
-0
lines changed

8 files changed

+379
-0
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/ErrorMessage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ public class ErrorMessage {
3838
"The required clustering key '%s' is missing in the control file mapping for table '%s'";
3939
public static final String MULTIPLE_MAPPINGS_FOR_COLUMN_FOUND =
4040
"Duplicated data mappings found for column '%s' in table '%s'";
41+
public static final String MISSING_CLUSTERING_KEY_COLUMN =
42+
"Missing required field or column mapping for clustering key %s";
43+
public static final String MISSING_PARTITION_KEY_COLUMN =
44+
"Missing required field or column mapping for partition key %s";
45+
public static final String MISSING_COLUMN = "Missing field or column mapping for %s";
4146
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.scalar.db.dataloader.core.dataimport;
2+
3+
import com.scalar.db.dataloader.core.FileFormat;
4+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
5+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
6+
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
7+
import lombok.Builder;
8+
import lombok.Data;
9+
10+
/** Import options to import data into one or more ScalarDB tables */
11+
@Builder
12+
@Data
13+
public class ImportOptions {
14+
15+
@Builder.Default private final ImportMode importMode = ImportMode.UPSERT;
16+
@Builder.Default private final boolean requireAllColumns = false;
17+
@Builder.Default private final FileFormat fileFormat = FileFormat.JSON;
18+
@Builder.Default private final boolean prettyPrint = false;
19+
@Builder.Default private final boolean ignoreNullValues = false;
20+
@Builder.Default private final LogMode logMode = LogMode.SPLIT_BY_DATA_CHUNK;
21+
22+
@Builder.Default
23+
private final ControlFileValidationLevel controlFileValidationLevel =
24+
ControlFileValidationLevel.MAPPED;
25+
26+
@Builder.Default private final char delimiter = ',';
27+
28+
@Builder.Default private final boolean logSuccessRecords = false;
29+
@Builder.Default private final boolean logRawRecord = false;
30+
31+
private final int dataChunkSize;
32+
private final int transactionBatchSize;
33+
private final ControlFile controlFile;
34+
private final String namespace;
35+
private final String tableName;
36+
private final int maxThreads;
37+
private final String customHeaderRow;
38+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.scalar.db.dataloader.core.dataimport.log;
2+
3+
/** Log modes available for import logging */
4+
public enum LogMode {
5+
SINGLE_FILE,
6+
SPLIT_BY_DATA_CHUNK
7+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.mapping;
2+
3+
import com.fasterxml.jackson.databind.node.ObjectNode;
4+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
5+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
6+
7+
public class ImportDataMapping {
8+
9+
/**
10+
* * Update the source data replace the source column name with the target column name according
11+
* to control file table data
12+
*
13+
* @param source source data
14+
* @param controlFileTable control file table to map source data
15+
*/
16+
public static void apply(ObjectNode source, ControlFileTable controlFileTable) {
17+
// Copy the source field data to the target column if missing
18+
for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) {
19+
String sourceField = mapping.getSourceField();
20+
String targetColumn = mapping.getTargetColumn();
21+
22+
if (source.has(sourceField) && !source.has(targetColumn)) {
23+
source.set(targetColumn, source.get(sourceField));
24+
source.remove(sourceField);
25+
}
26+
}
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.validation;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.HashSet;
6+
import java.util.List;
7+
import java.util.Set;
8+
import javax.annotation.concurrent.Immutable;
9+
10+
/** The validation result for a data source record */
11+
@Immutable
12+
public final class ImportSourceRecordValidationResult {
13+
14+
private final List<String> errorMessages;
15+
private final Set<String> columnsWithErrors;
16+
17+
/** Constructor */
18+
public ImportSourceRecordValidationResult() {
19+
this.errorMessages = new ArrayList<>();
20+
this.columnsWithErrors = new HashSet<>();
21+
}
22+
23+
/**
24+
* Add a validation error message for a column. Also marking the column as containing an error.
25+
*
26+
* @param columnName column name
27+
* @param errorMessage error message
28+
*/
29+
public void addErrorMessage(String columnName, String errorMessage) {
30+
this.columnsWithErrors.add(columnName);
31+
this.errorMessages.add(errorMessage);
32+
}
33+
34+
/** @return Immutable list of validation error messages */
35+
public List<String> getErrorMessages() {
36+
return Collections.unmodifiableList(this.errorMessages);
37+
}
38+
39+
/** @return Immutable set of columns that had errors */
40+
public Set<String> getColumnsWithErrors() {
41+
return Collections.unmodifiableSet(this.columnsWithErrors);
42+
}
43+
44+
/** @return Validation is valid or not */
45+
public boolean isValid() {
46+
return this.errorMessages.isEmpty();
47+
}
48+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.validation;
2+
3+
import static com.scalar.db.dataloader.core.ErrorMessage.MISSING_CLUSTERING_KEY_COLUMN;
4+
import static com.scalar.db.dataloader.core.ErrorMessage.MISSING_COLUMN;
5+
import static com.scalar.db.dataloader.core.ErrorMessage.MISSING_PARTITION_KEY_COLUMN;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.scalar.db.api.TableMetadata;
9+
import com.scalar.db.dataloader.core.DatabaseKeyType;
10+
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
11+
import java.util.Set;
12+
import lombok.AccessLevel;
13+
import lombok.NoArgsConstructor;
14+
15+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
16+
public class ImportSourceRecordValidator {
17+
18+
/**
19+
* Create list for validation error messages. Validate everything and not return when one single
20+
* error is found. Avoiding trial and error imports where every time a new error appears
21+
*
22+
* @param partitionKeyNames List of partition keys in table
23+
* @param clusteringKeyNames List of clustering keys in table
24+
* @param columnNames List of all column names in table
25+
* @param sourceRecord source data
26+
* @param allColumnsRequired If true treat missing columns as an error
27+
* @return Source record validation result
28+
*/
29+
public static ImportSourceRecordValidationResult validateSourceRecord(
30+
Set<String> partitionKeyNames,
31+
Set<String> clusteringKeyNames,
32+
Set<String> columnNames,
33+
JsonNode sourceRecord,
34+
boolean allColumnsRequired,
35+
TableMetadata tableMetadata) {
36+
ImportSourceRecordValidationResult validationResult = new ImportSourceRecordValidationResult();
37+
38+
// check if partition keys are found
39+
checkMissingKeys(DatabaseKeyType.PARTITION, partitionKeyNames, sourceRecord, validationResult);
40+
41+
// check if clustering keys are found
42+
checkMissingKeys(
43+
DatabaseKeyType.CLUSTERING, clusteringKeyNames, sourceRecord, validationResult);
44+
45+
// Check if the record is missing any columns
46+
if (allColumnsRequired) {
47+
checkMissingColumns(
48+
sourceRecord,
49+
columnNames,
50+
validationResult,
51+
validationResult.getColumnsWithErrors(),
52+
tableMetadata);
53+
}
54+
55+
return validationResult;
56+
}
57+
58+
/**
59+
* Check if the required keys are found in the data file.
60+
*
61+
* @param keyType Type of key to validate
62+
* @param keyColumnNames List of required column names
63+
* @param sourceRecord source data
64+
* @param validationResult Source record validation result
65+
*/
66+
public static void checkMissingKeys(
67+
DatabaseKeyType keyType,
68+
Set<String> keyColumnNames,
69+
JsonNode sourceRecord,
70+
ImportSourceRecordValidationResult validationResult) {
71+
for (String columnName : keyColumnNames) {
72+
if (!sourceRecord.has(columnName)) {
73+
String errorMessageFormat =
74+
keyType == DatabaseKeyType.PARTITION
75+
? String.format(MISSING_PARTITION_KEY_COLUMN, columnName)
76+
: String.format(MISSING_CLUSTERING_KEY_COLUMN, columnName);
77+
validationResult.addErrorMessage(columnName, errorMessageFormat);
78+
}
79+
}
80+
}
81+
82+
/**
83+
* Make sure the json object is not missing any columns. Error added to validation errors lists
84+
*
85+
* @param sourceRecord Source json object
86+
* @param columnNames List of column names for a table
87+
* @param validationResult Source record validation result
88+
* @param ignoreColumns Columns that can be ignored in the check
89+
*/
90+
public static void checkMissingColumns(
91+
JsonNode sourceRecord,
92+
Set<String> columnNames,
93+
ImportSourceRecordValidationResult validationResult,
94+
Set<String> ignoreColumns,
95+
TableMetadata tableMetadata) {
96+
for (String columnName : columnNames) {
97+
// If the field is not a metadata column and is missing and should not be ignored
98+
if ((ignoreColumns == null || !ignoreColumns.contains(columnName))
99+
&& !ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)
100+
&& !sourceRecord.has(columnName)) {
101+
validationResult.addErrorMessage(columnName, String.format(MISSING_COLUMN, columnName));
102+
}
103+
}
104+
}
105+
106+
/**
107+
* Make sure the json object is not missing any columns. Error added to validation errors lists
108+
*
109+
* @param sourceRecord Source json object
110+
* @param columnNames List of column names for a table
111+
* @param validationResult Source record validation result
112+
*/
113+
public static void checkMissingColumns(
114+
JsonNode sourceRecord,
115+
Set<String> columnNames,
116+
ImportSourceRecordValidationResult validationResult,
117+
TableMetadata tableMetadata) {
118+
ImportSourceRecordValidator.checkMissingColumns(
119+
sourceRecord, columnNames, validationResult, null, tableMetadata);
120+
}
121+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.mapping;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
7+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
8+
import java.util.ArrayList;
9+
import org.junit.jupiter.api.Assertions;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
13+
class ImportDataMappingTest {
14+
15+
ControlFileTable controlFilTable;
16+
17+
@BeforeEach
18+
void setup() {
19+
controlFilTable = new ControlFileTable("namespace", "table");
20+
ControlFileTableFieldMapping m1 = new ControlFileTableFieldMapping("source_id", "target_id");
21+
ControlFileTableFieldMapping m2 =
22+
new ControlFileTableFieldMapping("source_name", "target_name");
23+
ControlFileTableFieldMapping m3 =
24+
new ControlFileTableFieldMapping("source_email", "target_email");
25+
ArrayList<ControlFileTableFieldMapping> mappingArrayList = new ArrayList<>();
26+
mappingArrayList.add(m1);
27+
mappingArrayList.add(m2);
28+
mappingArrayList.add(m3);
29+
controlFilTable.getMappings().addAll(mappingArrayList);
30+
}
31+
32+
@Test
33+
void apply_withValidData_shouldUpdateSourceData() throws JsonProcessingException {
34+
ObjectMapper objectMapper = new ObjectMapper();
35+
ObjectNode source = objectMapper.createObjectNode();
36+
source.put("source_id", "111");
37+
source.put("source_name", "abc");
38+
source.put("source_email", "[email protected]");
39+
ImportDataMapping.apply(source, controlFilTable);
40+
// Assert changes
41+
Assertions.assertEquals("111", source.get("target_id").asText());
42+
Assertions.assertEquals("abc", source.get("target_name").asText());
43+
Assertions.assertEquals("[email protected]", source.get("target_email").asText());
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.validation;
2+
3+
import static com.scalar.db.dataloader.core.ErrorMessage.MISSING_CLUSTERING_KEY_COLUMN;
4+
5+
import com.fasterxml.jackson.databind.JsonNode;
6+
import com.scalar.db.api.TableMetadata;
7+
import com.scalar.db.dataloader.core.UnitTestUtils;
8+
import java.util.HashSet;
9+
import java.util.Set;
10+
import org.junit.jupiter.api.Assertions;
11+
import org.junit.jupiter.api.Test;
12+
13+
class ImportSourceRecordValidatorTest {
14+
15+
TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata();
16+
17+
@Test
18+
void
19+
validateSourceRecord_withValidData_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() {
20+
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
21+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
22+
Set<String> columnNames = mockMetadata.getColumnNames();
23+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
24+
ImportSourceRecordValidationResult result =
25+
ImportSourceRecordValidator.validateSourceRecord(
26+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
27+
Assertions.assertTrue(result.getColumnsWithErrors().isEmpty());
28+
}
29+
30+
@Test
31+
void
32+
validateSourceRecord_withValidDataWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() {
33+
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
34+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
35+
Set<String> columnNames = mockMetadata.getColumnNames();
36+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
37+
ImportSourceRecordValidationResult result =
38+
ImportSourceRecordValidator.validateSourceRecord(
39+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata);
40+
Assertions.assertTrue(result.getColumnsWithErrors().isEmpty());
41+
}
42+
43+
@Test
44+
void
45+
validateSourceRecord_withInValidPartitionKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
46+
Set<String> partitionKeyNames = new HashSet<>();
47+
partitionKeyNames.add("id1");
48+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
49+
Set<String> columnNames = mockMetadata.getColumnNames();
50+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
51+
ImportSourceRecordValidationResult result =
52+
ImportSourceRecordValidator.validateSourceRecord(
53+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
54+
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
55+
}
56+
57+
@Test
58+
void
59+
validateSourceRecord_withInValidPartitionKeyWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
60+
Set<String> partitionKeyNames = new HashSet<>();
61+
partitionKeyNames.add("id1");
62+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
63+
Set<String> columnNames = mockMetadata.getColumnNames();
64+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
65+
ImportSourceRecordValidationResult result =
66+
ImportSourceRecordValidator.validateSourceRecord(
67+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata);
68+
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
69+
Assertions.assertEquals(1, result.getErrorMessages().size());
70+
}
71+
72+
@Test
73+
void
74+
validateSourceRecord_withInValidClusteringKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
75+
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
76+
Set<String> clusteringKeyNames = new HashSet<>();
77+
clusteringKeyNames.add("id1");
78+
Set<String> columnNames = mockMetadata.getColumnNames();
79+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
80+
ImportSourceRecordValidationResult result =
81+
ImportSourceRecordValidator.validateSourceRecord(
82+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
83+
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
84+
Assertions.assertEquals(
85+
String.format(MISSING_CLUSTERING_KEY_COLUMN, "id1"), result.getErrorMessages().get(0));
86+
}
87+
}

0 commit comments

Comments
 (0)