Skip to content

Commit 2f41c11

Browse files
mieslepmsmygit
authored andcommitted
CDM-51 complex customer config now working again
1 parent 7923d7a commit 2f41c11

File tree

7 files changed

+171
-148
lines changed

7 files changed

+171
-148
lines changed

src/main/java/com/datastax/cdm/data/CqlConversion.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public Object convert(Object inputData) {
6969
if (null==conversionTypeList || conversionTypeList.isEmpty())
7070
return inputData;
7171

72+
if (logger.isTraceEnabled()) logger.trace("convert() - inputData: {}, converter: {}",inputData,this);
73+
7274
// The first element on the conversionTypeList tells us what conversion the top-level object requires
7375
Type conversionType = conversionTypeList.get(0);
7476
switch (conversionType) {
@@ -118,7 +120,7 @@ public static List<CqlConversion> getConversions(CqlTable fromTable, CqlTable to
118120
}
119121
else {
120122
cqlConversions.add(new CqlConversion(fromDataType, toDataType, fromTable.getCodecRegistry()));
121-
if (logger.isTraceEnabled()) logger.trace("At fromIndex {}, have added {}",i, cqlConversions.get(cqlConversions.size()-1));
123+
if (logger.isTraceEnabled()) logger.trace("At fromIndex {} (correspondingIndex {}), have added {}",i, correspondingIndex, cqlConversions.get(cqlConversions.size()-1));
122124
}
123125
}
124126

src/main/java/com/datastax/cdm/data/EnhancedPK.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package com.datastax.cdm.data;
2+
import com.datastax.cdm.feature.ExplodeMap;
23
import com.datastax.cdm.properties.KnownProperties;
34
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
@@ -49,11 +50,11 @@ public EnhancedPK(PKFactory factory, List<Object> values, List<Class> classes, I
4950
this.explodeMap = explodeMap;
5051
}
5152

52-
public List<EnhancedPK> explode() {
53+
public List<EnhancedPK> explode(ExplodeMap explodeMapFeature) {
5354
if (null == explodeMap || explodeMap.isEmpty()) {
5455
return Collections.singletonList(this);
5556
}
56-
return explodeMap.entrySet().stream()
57+
return explodeMapFeature.explode(explodeMap).stream()
5758
.map(entry -> new EnhancedPK(factory, values, classes, ttl, writeTimestamp, entry.getKey(), entry.getValue()))
5859
.collect(Collectors.toList());
5960
}

src/main/java/com/datastax/cdm/data/PKFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public enum Side {
4343
private final Integer explodeMapOriginColumnIndex;
4444
private final Integer explodeMapTargetKeyColumnIndex;
4545
private final Integer explodeMapTargetPKIndex;
46+
private final ExplodeMap explodeMapFeature;
4647

4748
// These defaults address the problem where we cannot insert null values into a PK column
4849
private final Long defaultForMissingTimestamp;
@@ -75,6 +76,7 @@ public PKFactory(PropertyHelper propertyHelper, CqlTable originTable, CqlTable t
7576
this.explodeMapTargetKeyColumnIndex = setExplodeMapMethods_getTargetKeyColumnIndex();
7677
this.explodeMapOriginColumnIndex = getExplodeMapOriginColumnIndex();
7778
this.explodeMapTargetPKIndex = targetPKLookupMethods.indexOf(LookupMethod.EXPLODE_MAP);
79+
this.explodeMapFeature = (ExplodeMap) targetTable.getFeature(Featureset.EXPLODE_MAP);
7880

7981
// These need to be set once all the features have been processed
8082
this.targetPKIndexesToBind = getIndexesToBind(Side.TARGET);
@@ -208,7 +210,7 @@ public List<Record> toValidRecordList(Record record) {
208210

209211
List<Record> recordSet;
210212
if (record.getPk().canExplode()) {
211-
recordSet = record.getPk().explode().stream()
213+
recordSet = record.getPk().explode(explodeMapFeature).stream()
212214
.filter(pk -> !pk.isError())
213215
.map(pk -> new Record(pk, record.getOriginRow(), record.getTargetRow()))
214216
.collect(Collectors.toList());

src/main/java/com/datastax/cdm/feature/ExplodeMap.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package com.datastax.cdm.feature;
22

3+
import com.datastax.cdm.data.CqlConversion;
34
import com.datastax.cdm.data.CqlData;
45
import com.datastax.cdm.properties.IPropertyHelper;
56
import com.datastax.cdm.properties.KnownProperties;
67
import com.datastax.cdm.schema.CqlTable;
8+
import com.datastax.oss.driver.api.core.type.DataType;
79
import org.slf4j.Logger;
810
import org.slf4j.LoggerFactory;
911

10-
import java.util.Arrays;
11-
import java.util.Collections;
12-
import java.util.List;
12+
import java.util.*;
13+
import java.util.stream.Collectors;
1314

1415
public class ExplodeMap extends AbstractFeature {
1516
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
@@ -23,6 +24,9 @@ public class ExplodeMap extends AbstractFeature {
2324
private String valueColumnName = "";
2425
private Integer valueColumnIndex = -1;
2526

27+
protected CqlConversion keyConversion = null;
28+
protected CqlConversion valueConversion = null;
29+
2630
@Override
2731
public boolean loadProperties(IPropertyHelper helper) {
2832
if (null == helper) { throw new IllegalArgumentException("helper is null");}
@@ -114,11 +118,52 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
114118
this.valueColumnIndex = targetTable.indexOf(valueColumnName);
115119
}
116120

121+
DataType originDataType = originTable.getDataType(originColumnName);
122+
if (CqlData.toType(originDataType) != CqlData.Type.MAP) {
123+
logger.error("Origin column {} is not a map, it is {}", originColumnName, originDataType);
124+
isValid = false;
125+
}
126+
127+
if (isValid) {
128+
// Compute the target conversions, as these columns will not have a corresponding index
129+
List<DataType> originMapTypes = CqlData.extractDataTypesFromCollection(originDataType);
130+
131+
DataType keyDataType = targetTable.getDataType(keyColumnName);
132+
keyConversion = new CqlConversion(originMapTypes.get(0), keyDataType, targetTable.getCodecRegistry());
133+
if (logger.isTraceEnabled()) logger.trace("Key conversion is {}",keyConversion);
134+
135+
DataType valueDataType = targetTable.getDataType(valueColumnName);
136+
valueConversion = new CqlConversion(originMapTypes.get(1), valueDataType, targetTable.getCodecRegistry());
137+
if (logger.isTraceEnabled()) logger.trace("Value conversion is {}",valueConversion);
138+
}
139+
117140
if (!isValid) isEnabled = false;
118141
logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled?"enabled":"disabled");
119142
return isValid;
120143
}
121144

145+
public Set<Map.Entry<Object, Object>> explode(Map<Object, Object> map) {
146+
if (map == null) { return null; }
147+
return map.entrySet().stream()
148+
.map(this::applyConversions)
149+
.collect(Collectors.toSet());
150+
}
151+
152+
private Map.Entry<Object, Object> applyConversions(Map.Entry<Object, Object> entry) {
153+
Object key = entry.getKey();
154+
Object value = entry.getValue();
155+
156+
if (keyConversion != null) {
157+
key = keyConversion.convert(key);
158+
}
159+
160+
if (valueConversion != null) {
161+
value = valueConversion.convert(value);
162+
}
163+
164+
return new AbstractMap.SimpleEntry<>(key, value);
165+
}
166+
122167
public String getOriginColumnName() { return isEnabled ? originColumnName : ""; }
123168
public Integer getOriginColumnIndex() { return isEnabled ? originColumnIndex : -1; }
124169

@@ -145,5 +190,4 @@ public static String getValueColumnName(IPropertyHelper helper) {
145190
String columnName = CqlTable.unFormatName(helper.getString(KnownProperties.EXPLODE_MAP_TARGET_VALUE_COLUMN_NAME));
146191
return (null == columnName) ? "" : columnName;
147192
}
148-
149193
}

src/main/java/com/datastax/cdm/schema/CqlTable.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class CqlTable extends BaseTable {
5353

5454
public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession session) {
5555
super(propertyHelper, isOrigin);
56+
this.keyspaceName = unFormatName(keyspaceName);
57+
this.tableName = unFormatName(tableName);
58+
5659
this.cqlSession = session;
5760

5861
// setCqlMetadata(session) will set:
@@ -92,6 +95,11 @@ public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession sess
9295
this.featureMap = new HashMap<>();
9396
}
9497

98+
@Override
99+
public String getKeyspaceTable() {
100+
return formatName(this.keyspaceName) + "." + formatName(this.tableName);
101+
}
102+
95103
public void setFeatureMap(Map<Featureset, Feature> featureMap) { this.featureMap = featureMap; }
96104
public Feature getFeature(Featureset featureEnum) { return featureMap.get(featureEnum); }
97105

@@ -274,13 +282,13 @@ private void setCqlMetadata(CqlSession cqlSession) {
274282
else
275283
this.hasRandomPartitioner = false;
276284

277-
Optional<KeyspaceMetadata> keyspaceMetadataOpt = metadata.getKeyspace(this.keyspaceName);
285+
Optional<KeyspaceMetadata> keyspaceMetadataOpt = metadata.getKeyspace(formatName(this.keyspaceName));
278286
if (!keyspaceMetadataOpt.isPresent()) {
279287
throw new IllegalArgumentException("Keyspace not found: " + this.keyspaceName);
280288
}
281289
KeyspaceMetadata keyspaceMetadata = keyspaceMetadataOpt.get();
282290

283-
Optional<TableMetadata> tableMetadataOpt = keyspaceMetadata.getTable(this.tableName);
291+
Optional<TableMetadata> tableMetadataOpt = keyspaceMetadata.getTable(formatName(this.tableName));
284292
if (!tableMetadataOpt.isPresent()) {
285293
throw new IllegalArgumentException("Table not found: " + tableName);
286294
}

src/test/java/com/datastax/cdm/cql/CommonMocks.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package com.datastax.cdm.cql;
22

3-
import com.datastax.cdm.data.CqlData;
4-
import com.datastax.cdm.data.EnhancedPK;
5-
import com.datastax.cdm.data.PKFactory;
6-
import com.datastax.cdm.data.Record;
3+
import com.datastax.cdm.data.*;
74
import com.datastax.cdm.feature.*;
85
import com.datastax.cdm.properties.IPropertyHelper;
96
import com.datastax.cdm.properties.KnownProperties;
@@ -51,13 +48,17 @@ public class CommonMocks {
5148
@Mock public ResultSet originResultSet;
5249
@Mock public MutableCodecRegistry originCodecRegistry;
5350
@Mock public TypeCodec originCodec;
51+
@Mock public List<CqlConversion> originConversionList;
52+
@Mock public CqlConversion originCqlConversion;
5453

5554
@Mock public EnhancedSession targetSession;
5655
@Mock public CqlSession targetCqlSession;
5756
@Mock public CqlTable targetTable;
5857
@Mock public ResultSet targetResultSet;
5958
@Mock public MutableCodecRegistry targetCodecRegistry;
6059
@Mock public TypeCodec targetCodec;
60+
@Mock public List<CqlConversion> targetConversionList;
61+
@Mock public CqlConversion targetCqlConversion;
6162

6263
@Mock public ConstantColumns constantColumnsFeature;
6364
@Mock public ExplodeMap explodeMapFeature;
@@ -284,7 +285,10 @@ public void setOriginTableWhens() {
284285
when(originTable.getCodecRegistry()).thenReturn(originCodecRegistry);
285286
when(originCodecRegistry.codecFor(any(DataType.class), any(Class.class))).thenReturn(originCodec);
286287
when(originCodecRegistry.codecFor(any(DataType.class))).thenReturn(originCodec);
287-
// when(originCodec.parse(anyString())).thenReturn(any());
288+
289+
when(originTable.getConversions()).thenReturn(originConversionList);
290+
when(originConversionList.get(anyInt())).thenReturn(originCqlConversion);
291+
when(originCqlConversion.convert(any())).thenAnswer(invocation -> invocation.getArgument(0));
288292
}
289293

290294
public void setTargetVariables() {
@@ -385,8 +389,12 @@ public void setTargetTableWhens() {
385389
when(targetTable.getFeature(Featureset.EXPLODE_MAP)).thenReturn(explodeMapFeature);
386390
if (hasExplodeMap) {
387391
when(explodeMapFeature.isEnabled()).thenReturn(true);
392+
when(explodeMapFeature.getOriginColumnIndex()).thenReturn(originColumnNames.indexOf(explodeMapColumn));
388393
when(explodeMapFeature.getKeyColumnIndex()).thenReturn(targetColumnNames.indexOf(explodeMapKey));
389394
when(explodeMapFeature.getValueColumnIndex()).thenReturn(targetColumnNames.indexOf(explodeMapValue));
395+
when(explodeMapFeature.getOriginColumnName()).thenReturn(explodeMapColumn);
396+
when(explodeMapFeature.getKeyColumnName()).thenReturn(explodeMapKey);
397+
when(explodeMapFeature.getValueColumnName()).thenReturn(explodeMapValue);
390398
} else {
391399
when(explodeMapFeature.isEnabled()).thenReturn(false);
392400
when(explodeMapFeature.getKeyColumnIndex()).thenReturn(-1);
@@ -399,7 +407,10 @@ public void setTargetTableWhens() {
399407
when(targetTable.getCodecRegistry()).thenReturn(targetCodecRegistry);
400408
when(targetCodecRegistry.codecFor(any(DataType.class), any(Class.class))).thenReturn(targetCodec);
401409
when(targetCodecRegistry.codecFor(any(DataType.class))).thenReturn(targetCodec);
402-
// when(targetCodec.parse(anyString())).thenReturn(anyString());
410+
411+
when(targetTable.getConversions()).thenReturn(targetConversionList);
412+
when(targetConversionList.get(anyInt())).thenReturn(targetCqlConversion);
413+
when(targetCqlConversion.convert(any())).thenAnswer(invocation -> invocation.getArgument(0));
403414
}
404415

405416
public void setSessionWhens(boolean isOrigin) {

0 commit comments

Comments
 (0)