Skip to content

Commit 0f5d3fc

Browse files
committed
CDM-38 and CDM-37 : getting scenarios working with local, more complex, test
1 parent 6fbdb4b commit 0f5d3fc

File tree

5 files changed

+116
-25
lines changed

5 files changed

+116
-25
lines changed

src/main/java/datastax/cdm/cql/statement/AbstractTargetUpsertStatement.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public abstract class AbstractTargetUpsertStatement extends BaseCdmStatement {
3434
protected int bindIndex = 0;
3535
protected int explodeMapKeyIndex = -1;
3636
protected int explodeMapValueIndex = -1;
37+
protected MigrateDataType explodeMapKeyDataType;
38+
protected MigrateDataType explodeMapValueDataType;
3739

3840
protected abstract String buildStatement();
3941
protected abstract BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long writeTime, Object explodeMapKey, Object explodeMapValue);
@@ -50,6 +52,7 @@ public AbstractTargetUpsertStatement(PropertyHelper propertyHelper, CqlHelper cq
5052
setNamesAndTypes();
5153
setConstantColumns();
5254
setExplodeMapIndexes();
55+
setExplodeMapTypes();
5356
setCounterIndexes();
5457

5558
this.statement = buildStatement();
@@ -176,4 +179,11 @@ else if (key.equals(explodeMapFeature.getString(ExplodeMap.Property.VALUE_COLUMN
176179
}
177180
}
178181

182+
private void setExplodeMapTypes() {
183+
if (FeatureFactory.isEnabled(explodeMapFeature)) {
184+
this.explodeMapKeyDataType = explodeMapFeature.getMigrateDataType(ExplodeMap.Property.KEY_COLUMN_TYPE);
185+
this.explodeMapValueDataType = explodeMapFeature.getMigrateDataType(ExplodeMap.Property.VALUE_COLUMN_TYPE);
186+
}
187+
}
188+
179189
}

src/main/java/datastax/cdm/cql/statement/TargetInsertStatement.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,43 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
3838
BoundStatement boundStatement = prepareStatement().bind();
3939

4040
int currentBindIndex = 0;
41-
Object bindValue;
42-
for (int index = 0; index < targetColumnTypes.size(); index++) {
43-
if (!bindColumnIndexes.contains(index)) {
41+
Object bindValue = null;
42+
43+
for (int targetIndex = 0; targetIndex < targetColumnTypes.size(); targetIndex++) {
44+
if (!bindColumnIndexes.contains(targetIndex)) {
4445
continue;
4546
}
4647

47-
MigrateDataType dataType = targetColumnTypes.get(index);
48-
49-
if (index==explodeMapKeyIndex) bindValue = explodeMapKey;
50-
else if (index==explodeMapValueIndex) bindValue = explodeMapValue;
51-
else if (dataType.hasUDT() && udtMappingEnabled) bindValue = udtMapper.convert(true, index, cqlHelper.getData(dataType, index, originRow));
52-
else if (index < originColumnTypes.size()) bindValue = cqlHelper.getData(dataType, index, originRow);
53-
else continue;
54-
55-
boundStatement = boundStatement.set(currentBindIndex++, bindValue, dataType.getTypeClass());
48+
MigrateDataType targetDataType = targetColumnTypes.get(targetIndex);
49+
Integer originIndex = ColumnsKeysTypes.getTargetToOriginColumnIndexes(propertyHelper).get(targetIndex);
50+
MigrateDataType originDataType = null;
51+
try {
52+
if (targetIndex==explodeMapKeyIndex) {
53+
bindValue = explodeMapKey;
54+
originDataType = explodeMapKeyDataType;
55+
}
56+
else if (targetIndex==explodeMapValueIndex) {
57+
bindValue = explodeMapValue;
58+
originDataType = explodeMapValueDataType;
59+
}
60+
else if (originIndex < 0) {
61+
continue;
62+
}
63+
else {
64+
originDataType = originColumnTypes.get(originIndex);
65+
Object originValue = cqlHelper.getData(originDataType, originIndex, originRow);
66+
if (targetDataType.hasUDT() && udtMappingEnabled) bindValue = udtMapper.convert(true, originIndex, originValue);
67+
else bindValue = originValue;
68+
}
69+
70+
bindValue = (targetDataType.equals(originDataType)) ? bindValue : MigrateDataType.convert(bindValue, originDataType, targetDataType, cqlHelper.getCodecRegistry());
71+
boundStatement = boundStatement.set(currentBindIndex++, bindValue, targetDataType.getTypeClass());
72+
}
73+
catch (Exception e) {
74+
logger.error("Error trying to bind value:" + bindValue + " to column:" + targetColumnNames.get(targetIndex) + " of targetDataType:" + targetDataType+ "/" + targetDataType.getTypeClass().getName() +
75+
" at column index:" + targetIndex + " with current bind index " + (currentBindIndex-1) + " from originIndex:" + originIndex + " " + originDataType + "/" + originDataType.getTypeClass().getName());
76+
throw e;
77+
}
5678
}
5779

5880
if (usingTTL) {

src/main/java/datastax/cdm/cql/statement/TargetUpdateStatement.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,42 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
4747
boundStatement = boundStatement.set(currentBindIndex++, writeTime, Long.class);
4848
}
4949

50-
for (int index : columnIndexesToBind) {
51-
MigrateDataType dataType = targetColumnTypes.get(index);
50+
for (int targetIndex : columnIndexesToBind) {
51+
MigrateDataType targetDataType = targetColumnTypes.get(targetIndex);
52+
MigrateDataType originDataType = null;
53+
Integer originIndex = ColumnsKeysTypes.getTargetToOriginColumnIndexes(propertyHelper).get(targetIndex);
5254
Object bindValue;
5355

54-
if(usingCounter && counterIndexes.contains(index)) bindValue = (originRow.getLong(index) - (null==targetRow ? 0 : targetRow.getLong(index)));
55-
else if (index==explodeMapKeyIndex) bindValue = explodeMapKey;
56-
else if (index==explodeMapValueIndex) bindValue = explodeMapValue;
57-
else if (dataType.hasUDT() && udtMappingEnabled) bindValue = udtMapper.convert(true, index, cqlHelper.getData(dataType, index, originRow));
58-
else bindValue = cqlHelper.getData(dataType, index, originRow);
56+
if(usingCounter && counterIndexes.contains(targetIndex)) {
57+
bindValue = (originRow.getLong(originIndex) - (null==targetRow ? 0 : targetRow.getLong(targetIndex)));
58+
originDataType = targetDataType;
59+
}
60+
else if (targetIndex==explodeMapKeyIndex) {
61+
bindValue = explodeMapKey;
62+
originDataType = explodeMapKeyDataType;
63+
}
64+
else if (targetIndex==explodeMapValueIndex) {
65+
bindValue = explodeMapValue;
66+
originDataType = explodeMapValueDataType;
67+
}
68+
else if (originIndex < 0) {
69+
bindValue = null;
70+
}
71+
else {
72+
originDataType = originColumnTypes.get(originIndex);
73+
Object originValue = cqlHelper.getData(originDataType, originIndex, originRow);
74+
if (targetDataType.hasUDT() && udtMappingEnabled) {
75+
bindValue = udtMapper.convert(true, targetIndex, originValue);
76+
}
77+
else {
78+
bindValue = originValue;
79+
}
5980

60-
boundStatement = boundStatement.set(currentBindIndex++, bindValue, dataType.getTypeClass());
81+
}
82+
if (null != bindValue) {
83+
bindValue = (targetDataType.equals(originDataType)) ? bindValue : MigrateDataType.convert(bindValue, originDataType, targetDataType, cqlHelper.getCodecRegistry());
84+
}
85+
boundStatement = boundStatement.set(currentBindIndex++, bindValue, targetDataType.getTypeClass());
6186
}
6287

6388
PKFactory pkFactory = cqlHelper.getPKFactory();

src/main/java/datastax/cdm/job/DiffJobSession.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class DiffJobSession extends CopyJobSession {
4747
private final List<MigrateDataType> originColumnTypes;
4848
private final int explodeMapKeyIndex;
4949
private final int explodeMapValueIndex;
50+
protected final MigrateDataType explodeMapKeyDataType;
51+
protected final MigrateDataType explodeMapValueDataType;
5052
protected UDTMapper udtMapper;
5153
protected boolean udtMappingEnabled;
5254

@@ -73,10 +75,14 @@ private DiffJobSession(CqlSession originSession, CqlSession targetSession, Spark
7375
List<String> targetColumnNames = ColumnsKeysTypes.getTargetColumnNames(propertyHelper);
7476
this.explodeMapKeyIndex = targetColumnNames.indexOf(explodeMapFeature.getString(ExplodeMap.Property.KEY_COLUMN_NAME));
7577
this.explodeMapValueIndex = targetColumnNames.indexOf(explodeMapFeature.getString(ExplodeMap.Property.VALUE_COLUMN_NAME));
78+
this.explodeMapKeyDataType = explodeMapFeature.getMigrateDataType(ExplodeMap.Property.KEY_COLUMN_TYPE);
79+
this.explodeMapValueDataType = explodeMapFeature.getMigrateDataType(ExplodeMap.Property.VALUE_COLUMN_TYPE);
7680
}
7781
else {
7882
this.explodeMapKeyIndex = -1;
7983
this.explodeMapValueIndex = -1;
84+
this.explodeMapKeyDataType = null;
85+
this.explodeMapValueDataType = null;
8086
}
8187

8288
this.codecRegistry = cqlHelper.getCodecRegistry();
@@ -227,18 +233,33 @@ private String isDifferent(EnhancedPK pk, Row originRow, Row targetRow) {
227233

228234
Object origin;
229235
MigrateDataType originDataTypeObj = null;
230-
if (targetIndex == explodeMapKeyIndex) origin = pk.getExplodeMapKey();
231-
else if (targetIndex == explodeMapValueIndex) origin = pk.getExplodeMapValue();
236+
if (targetIndex == explodeMapKeyIndex) {
237+
origin = pk.getExplodeMapKey();
238+
originDataTypeObj = explodeMapKeyDataType;
239+
}
240+
else if (targetIndex == explodeMapValueIndex) {
241+
origin = pk.getExplodeMapValue();
242+
originDataTypeObj = explodeMapValueDataType;
243+
}
232244
else {
233245
int originIndex = targetToOriginColumnIndexes.get(targetIndex);
234246
if (originIndex < 0)
235247
origin = null;
236248
else {
237249
originDataTypeObj = originColumnTypes.get(originIndex);
238250
origin = cqlHelper.getData(originDataTypeObj, originIndex, originRow);
239-
if (!originDataTypeObj.equals(targetDataTypeObj)) {
240-
origin = MigrateDataType.convert(origin, originDataTypeObj, targetDataTypeObj, codecRegistry);
241-
}
251+
}
252+
}
253+
254+
if (null != originDataTypeObj && !originDataTypeObj.equals(targetDataTypeObj)) {
255+
Object originalOrigin = origin;
256+
try {
257+
origin = MigrateDataType.convert(origin, originDataTypeObj, targetDataTypeObj, codecRegistry);
258+
}
259+
catch (Exception e) {
260+
logger.error("Error converting data from {} ({}) to {} ({}) for key {} and targetColumn {}; exception: {}",
261+
getFormattedContent(originDataTypeObj, originalOrigin), originDataTypeObj.getTypeClass().getName(), getFormattedContent(targetDataTypeObj, target), targetDataTypeObj.getTypeClass().getName(), pk, targetColumnNames.get(targetIndex), e);
262+
throw e;
242263
}
243264
}
244265

src/main/java/datastax/cdm/job/MigrateDataType.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import com.datastax.oss.driver.api.core.type.DataTypes;
88
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
99
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
1012

1113
import java.math.BigDecimal;
1214
import java.math.BigInteger;
@@ -17,6 +19,8 @@
1719
import java.util.*;
1820

1921
public class MigrateDataType {
22+
public final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
23+
2024
Class typeClass = Object.class;
2125
String dataTypeString = "";
2226
int type = -1;
@@ -85,8 +89,10 @@ public boolean diff(Object obj1, Object obj2) {
8589
if (obj1 == null && obj2 == null) {
8690
return false;
8791
} else if (obj1 == null && obj2 != null) {
92+
logger.info("DEBUG: obj1 is null and obj2 is not null");
8893
return true;
8994
} else if (obj1 != null && obj2 == null) {
95+
logger.info("DEBUG: obj2 is null and obj1 is not null");
9096
return true;
9197
}
9298

@@ -95,10 +101,17 @@ public boolean diff(Object obj1, Object obj2) {
95101

96102
@SuppressWarnings("unchecked")
97103
public static Object convert(Object value, MigrateDataType fromDataType, MigrateDataType toDataType, CodecRegistry codecRegistry) {
104+
if (null==value) return null;
105+
if (null==fromDataType || null==toDataType || null==codecRegistry)
106+
throw new IllegalArgumentException("fromDataType, toDataType, and codecRegistry must not be null");
98107
Class<?> fromClass = fromDataType.getTypeClass();
99108
Class<?> toClass = toDataType.getTypeClass();
100109
DataType cqlType = toDataType.getCqlDataType();
101110

111+
if (!value.getClass().equals(fromClass)) {
112+
throw new IllegalArgumentException("Value is not of type " + fromClass.getName() + " but of type " + value.getClass().getName());
113+
}
114+
102115
TypeCodec<Object> fromCodec = (TypeCodec<Object>) codecRegistry.codecFor(cqlType, fromClass);
103116
if (fromCodec == null) {
104117
throw new IllegalArgumentException("No codec found in codecRegistry for Java type " + fromClass.getName() + " to CQL type " + toDataType);

0 commit comments

Comments
 (0)