Skip to content

Commit ffe4f1f

Browse files
committed
CDM-23 adding ExplodeMap feature
1 parent 9010922 commit ffe4f1f

File tree

12 files changed

+620
-33
lines changed

12 files changed

+620
-33
lines changed

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
9595
targetRow = targetResultSet.one();
9696
}
9797

98-
BoundStatement bInsert = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow);
98+
BoundStatement bInsert = cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow);
9999
if (null == bInsert) {
100100
skipCnt++;
101101
continue;
@@ -128,7 +128,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
128128
}
129129

130130
writeLimiter.acquire(1);
131-
BoundStatement bInsert = cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null);
131+
BoundStatement bInsert = cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null);
132132
if (null == bInsert) {
133133
skipCnt++;
134134
continue;

src/main/java/datastax/astra/migrate/CopyPKJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
5656
return;
5757
}
5858
ResultSet targetWriteResultSet = cqlHelper.getTargetSession()
59-
.execute(cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), pkRow, null));
59+
.execute(cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), pkRow, null));
6060
writeCounter.incrementAndGet();
6161
if (readCounter.get() % printStatsAfter == 0) {
6262
printCounts(false);

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private void diff(Row originRow, Row targetRow) {
144144
//correct data
145145

146146
if (autoCorrectMissing) {
147-
cqlHelper.getTargetSession().execute(cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null));
147+
cqlHelper.getTargetSession().execute(cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null));
148148
correctedMissingCounter.incrementAndGet();
149149
logger.error("Inserted missing row in target: {}", cqlHelper.getKey(originRow));
150150
}
@@ -159,9 +159,9 @@ private void diff(Row originRow, Row targetRow) {
159159

160160
if (autoCorrectMismatch) {
161161
if (cqlHelper.isCounterTable()) {
162-
cqlHelper.getTargetSession().execute(cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow));
162+
cqlHelper.getTargetSession().execute(cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, targetRow));
163163
} else {
164-
cqlHelper.getTargetSession().execute(cqlHelper.bindInsert(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null));
164+
cqlHelper.getTargetSession().execute(cqlHelper.bindInsertOneRow(cqlHelper.getPreparedStatement(CqlHelper.CQL.TARGET_INSERT), originRow, null));
165165
}
166166
correctedMismatchCounter.incrementAndGet();
167167
logger.error("Updated mismatch row in target: {}", cqlHelper.getKey(originRow));
@@ -175,8 +175,8 @@ private void diff(Row originRow, Row targetRow) {
175175

176176
private String isDifferent(Row originRow, Row targetRow) {
177177
StringBuffer diffData = new StringBuffer();
178-
IntStream.range(0, cqlHelper.getSelectColTypes().size()).parallel().forEach(index -> {
179-
MigrateDataType dataTypeObj = cqlHelper.getSelectColTypes().get(index);
178+
IntStream.range(0, cqlHelper.getOriginColTypes().size()).parallel().forEach(index -> {
179+
MigrateDataType dataTypeObj = cqlHelper.getOriginColTypes().get(index);
180180
Object origin = cqlHelper.getData(dataTypeObj, index, originRow);
181181
if (index < cqlHelper.getIdColTypes().size()) {
182182
Optional<Object> optionalVal = cqlHelper.handleBlankInPrimaryKey(index, origin, dataTypeObj.typeClass, originRow, false);

src/main/java/datastax/astra/migrate/MigrateDataType.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ public class MigrateDataType {
1515
Class typeClass = Object.class;
1616
String dataTypeString = "";
1717
int type = -1;
18-
List<Class> subTypes = new ArrayList<Class>();
18+
List<Class> subTypeClasses = new ArrayList<Class>();
19+
List<MigrateDataType> subTypeTypes = new ArrayList<MigrateDataType>();
1920
private boolean isValid = false;
2021
private static int minType = 0;
2122
private static int maxType = 19;
@@ -30,7 +31,8 @@ public MigrateDataType(String dataType) {
3031
if (count == 1) {
3132
this.type = typeAsInt;
3233
} else {
33-
subTypes.add(getType(typeAsInt));
34+
subTypeClasses.add(getType(typeAsInt));
35+
subTypeTypes.add(new MigrateDataType(type));
3436
}
3537
count++;
3638
}
@@ -41,7 +43,7 @@ public MigrateDataType(String dataType) {
4143

4244
if ((this.type >= minType && this.type <= maxType) || this.type == UNKNOWN_TYPE) {
4345
isValid = true;
44-
for (Object o : subTypes) {
46+
for (Object o : subTypeClasses) {
4547
if (null == o || Object.class == o) {
4648
isValid = false;
4749
}
@@ -132,10 +134,12 @@ public Class getType() {
132134
return this.typeClass;
133135
}
134136

135-
public List<Class> getSubTypes() {
136-
return this.subTypes;
137+
public List<Class> getSubTypeClasses() {
138+
return this.subTypeClasses;
137139
}
138140

141+
public List<MigrateDataType> getSubTypeTypes() {return this.subTypeTypes;}
142+
139143
public boolean isValid() {
140144
return isValid;
141145
}
@@ -146,7 +150,7 @@ public boolean equals(Object o) {
146150
if (o == null || getClass() != o.getClass()) return false;
147151
MigrateDataType that = (MigrateDataType) o;
148152
return type == that.type &&
149-
Objects.equals(subTypes, that.subTypes);
153+
Objects.equals(subTypeClasses, that.subTypeClasses);
150154
}
151155

152156
@Override

src/main/java/datastax/astra/migrate/cql/CqlHelper.java

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,43 @@ public boolean initialize() {
9696
return validInit;
9797
}
9898

99-
public BoundStatement bindInsert(PreparedStatement insertStatement, Row originRow, Row targetRow) {
99+
public BoundStatement bindInsertOneRow(PreparedStatement insertStatement, Row originRow, Row targetRow) {
100+
return bindInsertOneRow(insertStatement, originRow, targetRow, null, null);
101+
}
102+
103+
public BoundStatement bindInsertOneRow(PreparedStatement insertStatement, Row originRow, Row targetRow, Object mapKey, Object mapValue) {
100104
BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);
101105

102-
int selectColTypesSize = getSelectColTypes().size();
106+
int originColTypesSize = getOriginColTypes().size();
103107
if (isCounterTable()) {
104-
for (int index = 0; index < selectColTypesSize; index++) {
105-
MigrateDataType dataType = getSelectColTypes().get(getOriginColumnIndexes().get(index));
108+
for (int index = 0; index < originColTypesSize; index++) {
109+
MigrateDataType dataType = getOriginColTypes().get(getOriginColumnIndexes().get(index));
106110
// compute the counter delta if reading from target for the difference
107-
if (targetRow != null && index < (selectColTypesSize - getIdColTypes().size())) {
111+
if (targetRow != null && index < (originColTypesSize - getIdColTypes().size())) {
108112
boundInsertStatement = boundInsertStatement.set(index, (originRow.getLong(getOriginColumnIndexes().get(index)) - targetRow.getLong(getOriginColumnIndexes().get(index))), Long.class);
109113
} else {
110114
boundInsertStatement = boundInsertStatement.set(index, getData(dataType, getOriginColumnIndexes().get(index), originRow), dataType.getType());
111115
}
112116
}
113117
} else {
114118
int index = 0;
115-
for (index = 0; index < selectColTypesSize; index++) {
116-
boundInsertStatement = getBoundStatement(originRow, boundInsertStatement, index, getSelectColTypes());
119+
// This loops over the selected columns and binds each type to the boundInsertStatement
120+
Feature explodeMapFeature = getFeature(Featureset.EXPLODE_MAP);
121+
for (index = 0; index < originColTypesSize; index++) {
122+
if (mapKey != null && explodeMapFeature.isEnabled() && index == explodeMapFeature.getInteger(ExplodeMap.Property.MAP_COLUMN_INDEX)) {
123+
// This substitutes the map column with the key and value types of the map
124+
boundInsertStatement = boundInsertStatement.set(index, mapKey, explodeMapFeature.getMigrateDataType(ExplodeMap.Property.KEY_COLUMN_TYPE).getType());
125+
// Add an 'extra' column to the statement, which will also increase the loop limit
126+
index++;
127+
originColTypesSize++;
128+
// And then bind the map value to the next column
129+
boundInsertStatement = boundInsertStatement.set(index, mapValue, explodeMapFeature.getMigrateDataType(ExplodeMap.Property.VALUE_COLUMN_TYPE).getType());
130+
131+
}
132+
else {
133+
// This is the previous behaviour, and when on any column that is not the map to explode
134+
boundInsertStatement = getBoundStatement(originRow, boundInsertStatement, index, getOriginColTypes());
135+
}
117136
if (boundInsertStatement == null) return null;
118137
}
119138

@@ -231,7 +250,7 @@ private String cqlTargetSelectOriginByPK() {
231250

232251
public long getLargestWriteTimeStamp(Row row) {
233252
return IntStream.range(0, getWriteTimeStampCols().size())
234-
.mapToLong(i -> row.getLong(getSelectColTypes().size() + getTtlCols().size() + i)).max().getAsLong();
253+
.mapToLong(i -> row.getLong(getOriginColTypes().size() + getTtlCols().size() + i)).max().getAsLong();
235254
}
236255

237256
public Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row row, boolean logWarn) {
@@ -263,6 +282,35 @@ public Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class
263282
return Optional.of(colData);
264283
}
265284

285+
public Optional<Object> toValidTargetColData(String colName, Object colData, Class dataType, Row row, boolean logWarn) {
286+
if (null != colData || !isTargetPrimaryKeyColumn(colName)) return Optional.of(colData);
287+
String dataLabel = getKey(row);
288+
289+
if (dataType == String.class) {
290+
if (logWarn) {
291+
logger.warn("For {}, found String primary-key column {} with blank value", dataLabel, colName);
292+
}
293+
return Optional.of("");
294+
}
295+
296+
if (dataType == Instant.class) {
297+
Long tsReplaceVal = getReplaceMissingTs();
298+
if (null == tsReplaceVal) {
299+
logger.error("Skipping {} as Timestamp primary-key column {} has invalid blank value. " +
300+
"Alternatively rerun the job with --conf {}=\"<fixed-epoch-value>\" " +
301+
"option to replace the blanks with a fixed timestamp value", dataLabel, colName,KnownProperties.TARGET_REPLACE_MISSING_TS);
302+
return Optional.empty();
303+
}
304+
if (logWarn) {
305+
logger.warn("For {}, found Timestamp primary-key column {} with invalid blank value. " +
306+
"Using value {} instead", dataLabel, colName, Instant.ofEpochSecond(tsReplaceVal));
307+
}
308+
return Optional.of(Instant.ofEpochSecond(tsReplaceVal));
309+
}
310+
311+
return Optional.of(colData);
312+
}
313+
266314
public String getKey(Row row) {
267315
StringBuffer key = new StringBuffer();
268316
for (int index = 0; index < getIdColTypes().size(); index++) {
@@ -279,11 +327,11 @@ public String getKey(Row row) {
279327

280328
public Object getData(MigrateDataType dataType, int index, Row row) {
281329
if (dataType.getType() == Map.class) {
282-
return row.getMap(index, dataType.getSubTypes().get(0), dataType.getSubTypes().get(1));
330+
return row.getMap(index, dataType.getSubTypeClasses().get(0), dataType.getSubTypeClasses().get(1));
283331
} else if (dataType.getType() == List.class) {
284-
return row.getList(index, dataType.getSubTypes().get(0));
332+
return row.getList(index, dataType.getSubTypeClasses().get(0));
285333
} else if (dataType.getType() == Set.class) {
286-
return row.getSet(index, dataType.getSubTypes().get(0));
334+
return row.getSet(index, dataType.getSubTypeClasses().get(0));
287335
} else if (isCounterTable() && dataType.getType() == Long.class) {
288336
Object data = row.get(index, dataType.getType());
289337
if (data == null) {
@@ -313,7 +361,7 @@ private BoundStatement getBoundStatement(Row originRow, BoundStatement boundSele
313361

314362
private int getLargestTTL(Row row) {
315363
return IntStream.range(0, getTtlCols().size())
316-
.map(i -> row.getInt(getSelectColTypes().size() + i)).max().getAsInt();
364+
.map(i -> row.getInt(getOriginColTypes().size() + i)).max().getAsInt();
317365
}
318366

319367
private Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row row) {
@@ -426,10 +474,18 @@ public Long getMaxWriteTimeStampFilter() {
426474
return propertyHelper.getLong(KnownProperties.ORIGIN_FILTER_WRITETS_MAX);
427475
}
428476

429-
public List<MigrateDataType> getSelectColTypes() {
477+
public List<MigrateDataType> getOriginColTypes() {
430478
return propertyHelper.getMigrationTypeList(KnownProperties.ORIGIN_COLUMN_TYPES);
431479
}
432480

481+
public List<String> getTargetColNames() {
482+
return propertyHelper.getStringList(KnownProperties.TARGET_COLUMN_NAMES);
483+
}
484+
485+
public List<MigrateDataType> getTargetColTypes() {
486+
return propertyHelper.getMigrationTypeList(KnownProperties.TARGET_COLUMN_TYPES);
487+
}
488+
433489
public List<MigrateDataType> getIdColTypes() {
434490
return propertyHelper.getMigrationTypeList(KnownProperties.TARGET_PRIMARY_KEY_TYPES);
435491
}
@@ -473,6 +529,10 @@ private void abendIfSessionsNotSet() {
473529
}
474530
}
475531

532+
private boolean isTargetPrimaryKeyColumn(String colName) {
533+
return propertyHelper.getStringList(KnownProperties.TARGET_PRIMARY_KEY).contains(colName);
534+
}
535+
476536
public Feature getFeature(Featureset featureEnum) {
477537
return featureMap.get(featureEnum);
478538
}

0 commit comments

Comments
 (0)