Skip to content

Commit 1599d8c

Browse files
committed
CDM-34 enabling column renaming, column names to be encapsulated in quotes
1 parent 5d59325 commit 1599d8c

21 files changed

+466
-269
lines changed

src/main/java/datastax/cdm/cql/CqlHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class CqlHelper {
2626
private ConsistencyLevel readConsistencyLevel;
2727
private ConsistencyLevel writeConsistencyLevel;
2828

29-
private final PropertyHelper propertyHelper;
29+
public final PropertyHelper propertyHelper;
3030
private final Map<Featureset, Feature> featureMap = new HashMap<>(Featureset.values().length);
3131
private PKFactory pkFactory;
3232
private OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement;
@@ -55,14 +55,15 @@ public boolean initialize() {
5555
featureMap.put(f, feature);
5656
}
5757

58+
pkFactory = new PKFactory(propertyHelper, this);
59+
5860
for (Featureset f : Featureset.values()) {
5961
if (f.toString().startsWith("TEST_")) continue; // Skip test features
6062
Feature feature = getFeature(f);
6163
if (isFeatureEnabled(f))
62-
feature.alterProperties(this.propertyHelper);
64+
feature.alterProperties(this.propertyHelper, this.pkFactory);
6365
}
6466

65-
pkFactory = new PKFactory(propertyHelper, this);
6667
originSelectByPartitionRangeStatement = new OriginSelectByPartitionRangeStatement(propertyHelper,this);
6768
originSelectByPKStatement = new OriginSelectByPKStatement(propertyHelper,this);
6869
targetInsertStatement = new TargetInsertStatement(propertyHelper,this);
@@ -149,10 +150,12 @@ private String getOriginKeyspaceTable() {
149150
return propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);
150151
}
151152

153+
152154
private String getTargetKeyspaceTable() {
153155
return propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE);
154156
}
155157

158+
156159
public boolean hasRandomPartitioner() {
157160
return propertyHelper.getBoolean(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER);
158161
}
@@ -171,9 +174,6 @@ public List<Integer> getWriteTimeStampCols() {
171174
return propertyHelper.getIntegerList(KnownProperties.ORIGIN_WRITETIME_INDEXES);
172175
}
173176

174-
//-------------------- Java Filters --------------------
175-
176-
177177
//----------- General Utilities --------------
178178
public Object getData(MigrateDataType dataType, int index, Row row) {
179179
if (dataType.getTypeClass() == Map.class) {

src/main/java/datastax/cdm/cql/statement/AbstractOriginSelectStatement.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,20 @@ private String ttlAndWritetimeCols() {
128128
StringBuilder sb = new StringBuilder();
129129
if (null != cqlHelper.getTtlCols()) {
130130
cqlHelper.getTtlCols().forEach(col -> {
131-
sb.append(",TTL(").append(resultColumns.get(col)).append(") as ttl_").append(resultColumns.get(col));
132-
resultColumns.add("ttl_" + resultColumns.get(col));
131+
String ttlCol = resultColumns.get(col);
132+
String cleanCol = ttlCol.replaceAll("-", "_").replace("\"", "");
133+
sb.append(",TTL(").append(ttlCol).append(") as ttl_").append(cleanCol);
134+
resultColumns.add("ttl_" + cleanCol);
133135
resultTypes.add(new MigrateDataType("1")); // int
134136
ttlIndexes.add(resultColumns.size()-1);
135137
});
136138
}
137139
if (null != cqlHelper.getWriteTimeStampCols()) {
138140
cqlHelper.getWriteTimeStampCols().forEach(col -> {
139-
sb.append(",WRITETIME(").append(resultColumns.get(col)).append(") as writetime_").append(resultColumns.get(col));
140-
resultColumns.add("writetime_" + resultColumns.get(col));
141+
String wtCol = resultColumns.get(col);
142+
String cleanCol = wtCol.replaceAll("-", "_").replace("\"", "");
143+
sb.append(",WRITETIME(").append(wtCol).append(") as writetime_").append(cleanCol);
144+
resultColumns.add("writetime_" + cleanCol);
141145
resultTypes.add(new MigrateDataType("2")); // application using as <long>, though Cassandra uses <timestamp>
142146
writeTimestampIndexes.add(resultColumns.size()-1);
143147
});

src/main/java/datastax/cdm/cql/statement/AbstractTargetUpsertStatement.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ public abstract class AbstractTargetUpsertStatement extends BaseCdmStatement {
3131
protected Feature explodeMapFeature;
3232

3333
protected int bindIndex = 0;
34-
protected int ttlBindIndex = -1;
35-
protected int writeTimeBindIndex = -1;
3634
protected int explodeMapKeyIndex = -1;
3735
protected int explodeMapValueIndex = -1;
3836

@@ -101,10 +99,10 @@ protected String usingTTLTimestamp() {
10199
*/
102100
private void setAndAlignNamesAndTypes() {
103101

104-
originColumnNames.addAll(propertyHelper.getStringList(KnownProperties.ORIGIN_COLUMN_NAMES));
105-
originColumnTypes.addAll(propertyHelper.getMigrationTypeList(KnownProperties.ORIGIN_COLUMN_TYPES));
106-
targetColumnNames.addAll(propertyHelper.getStringList(KnownProperties.TARGET_COLUMN_NAMES));
107-
targetColumnTypes.addAll(propertyHelper.getMigrationTypeList(KnownProperties.TARGET_COLUMN_TYPES));
102+
originColumnNames.addAll(propertyHelper.getOriginColumnNames());
103+
originColumnTypes.addAll(propertyHelper.getOriginColumnTypes());
104+
targetColumnNames.addAll(propertyHelper.getTargetColumnNames());
105+
targetColumnTypes.addAll(propertyHelper.getTargetColumnTypes());
108106

109107
// is this because of the explode map feature? in which case, we will insert a extra column
110108
if (originColumnNames.size() != targetColumnNames.size() && FeatureFactory.isEnabled(explodeMapFeature)) {
@@ -134,11 +132,6 @@ else if (!originColumnNames.get(mapColumnIndex+1).equals("")) {
134132
originColumnTypes.add(mapColumnIndex+1, new MigrateDataType());
135133
}
136134
}
137-
138-
// at the end, we want to assure that the origin and column names and types are of the right size
139-
if (originColumnNames.size() != targetColumnNames.size() || originColumnTypes.size() != targetColumnTypes.size()) {
140-
throw new RuntimeException("Origin and target column name and/or type lists are not the same size, and failed to make them the same size.");
141-
}
142135
}
143136

144137
private void setConstantColumns() {

src/main/java/datastax/cdm/cql/statement/OriginSelectByPKStatement.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class OriginSelectByPKStatement extends AbstractOriginSelectStatement {
2020

2121
public OriginSelectByPKStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper) {
2222
super(propertyHelper, cqlHelper);
23-
originPKTypes = cqlHelper.getPKFactory().getPKTypes(PKFactory.Side.ORIGIN);
23+
originPKTypes = propertyHelper.getOriginPKTypes();
2424
}
2525

2626
public Record getRecord(EnhancedPK pk) {
@@ -48,16 +48,9 @@ public BoundStatement bind(Object... binds) {
4848
throw new RuntimeException("Expected 1 nullable bind of type EnhancedPK, got " + binds.length);
4949

5050
EnhancedPK pk = (EnhancedPK) binds[0];
51-
if (pk.isError() || pk.getPKValues().size() != originPKTypes.size())
52-
throw new RuntimeException("PK is in Error state, or the number of values does not match the number of bind types");
5351

5452
BoundStatement boundStatement = prepareStatement().bind();
55-
for (int i = 0; i< originPKTypes.size(); i++) {
56-
if (originPKTypes.get(i).getTypeClass() != pk.getPKValues().get(i).getClass())
57-
throw new RuntimeException("PK value at index " + i + " does not match the expected type");
58-
59-
boundStatement = boundStatement.set(i,pk.getPKValues().get(i), originPKTypes.get(i).getTypeClass());
60-
}
53+
boundStatement = cqlHelper.getPKFactory().bindWhereClause(PKFactory.Side.ORIGIN, pk, boundStatement, 0);
6154

6255
return boundStatement
6356
.setConsistencyLevel(cqlHelper.getReadConsistencyLevel())

src/main/java/datastax/cdm/cql/statement/TargetInsertStatement.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15+
import java.util.List;
1516
import java.time.Duration;
1617

1718
public class TargetInsertStatement extends AbstractTargetUpsertStatement {
@@ -79,7 +80,7 @@ protected String buildStatement() {
7980
}
8081

8182
targetUpdateCQL = "INSERT INTO " + propertyHelper.getAsString(KnownProperties.TARGET_KEYSPACE_TABLE) +
82-
" (" + propertyHelper.getAsString(KnownProperties.TARGET_COLUMN_NAMES) +
83+
" (" + PropertyHelper.asString(propertyHelper.getTargetColumnNames(), KnownProperties.PropertyType.STRING_LIST) +
8384
(FeatureFactory.isEnabled(constantColumnFeature) ? "," + constantColumnFeature.getAsString(ConstantColumns.Property.COLUMN_NAMES) : "") +
8485
") VALUES (" + valuesList + ")";
8586

src/main/java/datastax/cdm/cql/statement/TargetSelectByPKStatement.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@
2020
public class TargetSelectByPKStatement extends BaseCdmStatement {
2121
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2222

23-
private final List<MigrateDataType> bindTypes;
24-
private final List<Integer> bindIndexes;
25-
2623
public TargetSelectByPKStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper) {
2724
super(propertyHelper, cqlHelper);
2825

2926
this.session = cqlHelper.getTargetSession();
30-
this.bindTypes = cqlHelper.getPKFactory().getPKTypes(PKFactory.Side.TARGET);
31-
this.bindIndexes = cqlHelper.getPKFactory().getPKIndexesToBind(PKFactory.Side.TARGET);
3227

3328
this.statement = buildStatement();
3429

@@ -60,17 +55,13 @@ public CompletionStage<AsyncResultSet> getAsyncResult(EnhancedPK pk) {
6055
private BoundStatement bind(EnhancedPK pk) {
6156
BoundStatement boundStatement = prepareStatement().bind()
6257
.setConsistencyLevel(cqlHelper.getReadConsistencyLevel());
63-
int bindIndex = 0;
64-
for (Integer pkIndexToBind : bindIndexes) {
65-
MigrateDataType type = bindTypes.get(pkIndexToBind);
66-
Object value = pk.getPKValues().get(pkIndexToBind);
67-
boundStatement = boundStatement.set(bindIndex++, value, type.getTypeClass());
68-
}
58+
59+
boundStatement = cqlHelper.getPKFactory().bindWhereClause(PKFactory.Side.TARGET, pk, boundStatement, 0);
6960
return boundStatement;
7061
}
7162

7263
private String buildStatement() {
73-
return "SELECT " + propertyHelper.getAsString(KnownProperties.TARGET_COLUMN_NAMES)
64+
return "SELECT " + PropertyHelper.asString(propertyHelper.getTargetColumnNames(), KnownProperties.PropertyType.STRING_LIST)
7465
+ " FROM " + propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE)
7566
+ " WHERE " + cqlHelper.getPKFactory().getWhereClause(PKFactory.Side.TARGET);
7667
}

src/main/java/datastax/cdm/cql/statement/TargetUpdateStatement.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.datastax.oss.driver.api.core.cql.BoundStatement;
44
import com.datastax.oss.driver.api.core.cql.Row;
5+
import datastax.cdm.data.EnhancedPK;
56
import datastax.cdm.job.MigrateDataType;
67
import datastax.cdm.cql.CqlHelper;
78
import datastax.cdm.data.PKFactory;
@@ -60,11 +61,8 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
6061
}
6162

6263
PKFactory pkFactory = cqlHelper.getPKFactory();
63-
for (int index : pkFactory.getPKIndexesToBind(PKFactory.Side.TARGET)) {
64-
MigrateDataType dataType = targetColumnTypes.get(index);
65-
Object bindValue = cqlHelper.getData(dataType, index, originRow);
66-
boundStatement = boundStatement.set(currentBindIndex++, bindValue, dataType.getTypeClass());
67-
}
64+
EnhancedPK pk = pkFactory.getTargetPK(originRow);
65+
boundStatement = pkFactory.bindWhereClause(PKFactory.Side.TARGET, pk, boundStatement, currentBindIndex);
6866

6967
return boundStatement
7068
.setConsistencyLevel(cqlHelper.getWriteConsistencyLevel())
@@ -103,7 +101,6 @@ protected String buildStatement() {
103101
}
104102

105103
private void setExplodeMapColumnsAndColumnIndexesToBind() {
106-
PKFactory pkFactory = cqlHelper.getPKFactory();
107104
int currentColumn = 0;
108105
for (String key : targetColumnNames) {
109106
if (FeatureFactory.isEnabled(explodeMapFeature)) {
@@ -113,7 +110,7 @@ else if (key.equals(explodeMapFeature.getString(ExplodeMap.Property.VALUE_COLUMN
113110
explodeMapValueIndex = currentColumn;
114111
}
115112

116-
if (!pkFactory.getPKNames(PKFactory.Side.TARGET).contains(key)) {
113+
if (!propertyHelper.getTargetPKNames().contains(key)) {
117114
columnIndexesToBind.add(currentColumn);
118115
}
119116
currentColumn++;

0 commit comments

Comments
 (0)