Skip to content

Commit 83fdfd0

Browse files
committed
CDM-69 fixing handling of null values in target primary key
1 parent 4528363 commit 83fdfd0

File tree

11 files changed

+103
-43
lines changed

11 files changed

+103
-43
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash -e
2+
3+
cat <<EOF
4+
!!!!!!!!
5+
!!!!!!!! Testing Migrate
6+
!!!!!!!!
7+
EOF
8+
9+
/local/cdm.sh -c
10+
spark-submit \
11+
--properties-file /smoke/01_basic_kvp/migrate.properties \
12+
--master "local[*]" \
13+
--class datastax.astra.migrate.Migrate /local/cassandra-data-migrator.jar
14+
15+
cat <<EOF
16+
!!!!!!!!
17+
!!!!!!!! Testing DiffData
18+
!!!!!!!!
19+
EOF
20+
21+
spark-submit \
22+
--properties-file /smoke/01_basic_kvp/migrate.properties \
23+
--master "local[*]" \
24+
--class datastax.astra.migrate.DiffData /local/cassandra-data-migrator.jar
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
migrateData com.datastax.cdm.job.Migrate migrate.properties
2+
validateData com.datastax.cdm.job.DiffData migrate.properties
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/bin/bash -e
2+
3+
workingDir="$1"
4+
cd "$workingDir"
5+
6+
for scenario in $(cat cdm.txt | awk '{print $1}'); do
7+
/local/cdm.sh -f cdm.txt -s $scenario -d "$workingDir"
8+
done
9+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
SELECT * FROM target.regression_null_ts_in_pk;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
key | ts | value
3+
------+---------------------------------+--------
4+
key1 | 2023-06-01 00:00:00.000000+0000 | valueA
5+
key2 | 2023-06-02 12:00:00.000000+0000 | valueB
6+
7+
(2 rows)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
spark.cdm.origin.connect.host cdm-sit-cass
2+
spark.cdm.target.connect.host cdm-sit-cass
3+
4+
spark.cdm.schema.origin.keyspaceTable origin.regression_null_ts_in_pk
5+
spark.cdm.schema.target.keyspaceTable target.regression_null_ts_in_pk
6+
spark.cdm.perfops.numParts 1
7+
8+
spark.cdm.autocorrect.missing true
9+
spark.cdm.autocorrect.mismatch true
10+
11+
spark.cdm.transform.missing.key.ts.replace.value 1685577600000
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
DROP TABLE IF EXISTS origin.regression_null_ts_in_pk;
2+
CREATE TABLE origin.regression_null_ts_in_pk(key text, ts timestamp, value text, PRIMARY KEY (key));
3+
INSERT INTO origin.regression_null_ts_in_pk(key,value) VALUES ('key1','valueA');
4+
INSERT INTO origin.regression_null_ts_in_pk(key,ts,value) VALUES ('key2','2023-06-02 12:00:00','valueB');
5+
6+
DROP TABLE IF EXISTS target.regression_null_ts_in_pk;
7+
CREATE TABLE target.regression_null_ts_in_pk(key text, ts timestamp, value text, PRIMARY KEY (key, ts));

src/main/java/com/datastax/cdm/data/EnhancedPK.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -90,32 +90,8 @@ private void validate() {
9090
if (null != value) continue;
9191
if (i==factory.getExplodeMapTargetPKIndex()) continue; // this is an unexploded PK
9292

93-
// This bit of code addresses the fact we cannot currently insert a NULL value
94-
// into a primary key column. So we replace it with an alternate value, or
95-
// mark the PK as invalid.
96-
this.messages = new ArrayList<>();
97-
Class c = classes.get(i);
98-
if (Objects.equals(c, String.class)) {
99-
values.set(i, factory.getDefaultForMissingString());
100-
messages.add(String.format("WARN: Defaulting null string value to the empty string for position %d", i));
101-
warningState = true;
102-
}
103-
else if (Objects.equals(c, Instant.class)) {
104-
Long tsReplaceVal = factory.getDefaultForMissingTimestamp();
105-
if (null != tsReplaceVal) {
106-
values.set(i, Instant.ofEpochSecond(tsReplaceVal).toString());
107-
messages.add(String.format("WARN: Defaulting null timestamp to %d for position %d", tsReplaceVal, i));
108-
warningState = true;
109-
}
110-
else {
111-
messages.add(String.format("ERROR: Null value for position %d, consider setting %s", i, KnownProperties.TRANSFORM_REPLACE_MISSING_TS));
112-
errorState = true;
113-
}
114-
}
115-
else {
116-
messages.add(String.format("ERROR: Null value for position %d", i));
117-
errorState = true;
118-
}
93+
messages.add(String.format("ERROR: Null value for position %d", i));
94+
errorState = true;
11995
}
12096
}
12197

src/main/java/com/datastax/cdm/data/PKFactory.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ public enum Side {
4545
private final Integer explodeMapTargetPKIndex;
4646
private final ExplodeMap explodeMapFeature;
4747

48-
// These defaults address the problem where we cannot insert null values into a PK column
49-
private final Long defaultForMissingTimestamp;
50-
private final String defaultForMissingString;
5148

5249
public PKFactory(PropertyHelper propertyHelper, CqlTable originTable, CqlTable targetTable) {
5350

@@ -67,9 +64,6 @@ public PKFactory(PropertyHelper propertyHelper, CqlTable originTable, CqlTable t
6764
originPKLookupMethods.add(null);
6865
}
6966

70-
this.defaultForMissingTimestamp = propertyHelper.getLong(KnownProperties.TRANSFORM_REPLACE_MISSING_TS);
71-
this.defaultForMissingString = "";
72-
7367
setOriginColumnLookupMethod(propertyHelper);
7468
setConstantColumns();
7569

@@ -220,14 +214,6 @@ public List<Record> toValidRecordList(Record record) {
220214
return recordSet;
221215
}
222216

223-
protected Long getDefaultForMissingTimestamp() {
224-
return defaultForMissingTimestamp;
225-
}
226-
227-
protected String getDefaultForMissingString() {
228-
return defaultForMissingString;
229-
}
230-
231217
public Integer getExplodeMapTargetPKIndex() {return explodeMapTargetPKIndex;}
232218

233219
private List<Object> getTargetPKValuesFromOriginColumnLookupMethod(Row originRow, List<Object> defaultValues) {

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
1414
import com.datastax.oss.driver.api.core.type.*;
1515
import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
16-
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
1716
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
1817
import com.datastax.cdm.data.CqlData;
1918
import com.datastax.cdm.data.CqlConversion;
@@ -24,6 +23,7 @@
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;
2625

26+
import java.time.Instant;
2727
import java.util.*;
2828
import java.util.stream.Collectors;
2929
import java.util.stream.IntStream;
@@ -38,6 +38,7 @@ public class CqlTable extends BaseTable {
3838
private final List<String> partitionKeyNames;
3939
private final List<String> pkNames;
4040
private final List<Class> pkClasses;
41+
private final List<Integer> pkIndexes;
4142
private boolean isCounterTable;
4243
private final ConsistencyLevel readConsistencyLevel;
4344
private final ConsistencyLevel writeConsistencyLevel;
@@ -54,6 +55,10 @@ public class CqlTable extends BaseTable {
5455
private final List<Integer> counterIndexes;
5556
protected Map<Featureset, Feature> featureMap;
5657

58+
// These defaults address the problem where we cannot insert null values into a PK column
59+
private final Long defaultForMissingTimestamp;
60+
private final String defaultForMissingString;
61+
5762
public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession session) {
5863
super(propertyHelper, isOrigin);
5964
this.keyspaceName = unFormatName(keyspaceName);
@@ -85,6 +90,9 @@ public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession sess
8590
this.pkClasses = pkTypes.stream()
8691
.map(CqlData::getBindClass)
8792
.collect(Collectors.toList());
93+
this.pkIndexes = pkNames.stream()
94+
.map(columnNames::indexOf)
95+
.collect(Collectors.toList());
8896

8997
this.counterIndexes = IntStream.range(0, columnCqlTypes.size())
9098
.filter(i -> columnCqlTypes.get(i).equals(DataTypes.COUNTER))
@@ -96,6 +104,9 @@ public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession sess
96104
this.writeConsistencyLevel = mapToConsistencyLevel(propertyHelper.getString(KnownProperties.WRITE_CL));
97105

98106
this.featureMap = new HashMap<>();
107+
108+
this.defaultForMissingTimestamp = propertyHelper.getLong(KnownProperties.TRANSFORM_REPLACE_MISSING_TS);
109+
this.defaultForMissingString = "";
99110
}
100111

101112
@Override
@@ -248,6 +259,9 @@ public int byteCount(int index, Object object) {
248259

249260
public Object getAndConvertData(int index, Row row) {
250261
Object thisObject = getData(index, row);
262+
if (null==thisObject) {
263+
return convertNull(index);
264+
}
251265
CqlConversion cqlConversion = this.cqlConversions.get(index);
252266
if (null==cqlConversion) {
253267
if (logTrace) logger.trace("{} Index:{} not converting:{}",isOrigin?"origin":"target",index,thisObject);
@@ -259,6 +273,29 @@ public Object getAndConvertData(int index, Row row) {
259273
}
260274
}
261275

276+
public Object convertNull(int thisIndex) {
277+
// We do not need to convert nulls for non-PK columns
278+
int otherIndex = this.getCorrespondingIndex(thisIndex);
279+
if (!getOtherCqlTable().pkIndexes.contains(otherIndex))
280+
return null;
281+
282+
Class c = getOtherCqlTable().bindClasses.get(otherIndex);
283+
if (Objects.equals(c, String.class)) {
284+
return defaultForMissingString;
285+
}
286+
else if (Objects.equals(c, Instant.class)) {
287+
if (null != defaultForMissingTimestamp) {
288+
return Instant.ofEpochMilli(defaultForMissingTimestamp);
289+
} else {
290+
logger.error("This index {} corresponds to That index {}, which is a primary key column and cannot be null. Consider setting {}.", thisIndex, otherIndex, KnownProperties.TRANSFORM_REPLACE_MISSING_TS);
291+
return null;
292+
}
293+
}
294+
295+
logger.error("This index {} corresponds to That index {}, which is a primary key column and cannot be null.", thisIndex, otherIndex);
296+
return null;
297+
}
298+
262299
public Integer getCorrespondingIndex(int index) {
263300
if (index <= 0) return index;
264301
return this.correspondingIndexes.get(index);

0 commit comments

Comments
 (0)