Skip to content

Commit 39ea048

Browse files
committed
CDM-39 initial working with larger batchSize
1 parent aa8fa16 commit 39ea048

27 files changed

+294
-225
lines changed

SIT/regression/03_performance/fix.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ spark.cdm.schema.origin.column.partition.names pk_id
99
spark.cdm.schema.origin.column.types 1,0,0
1010
spark.cdm.schema.target.column.id.names pk_id,ck_id
1111

12-
spark.cdm.perfops.numParts 1
12+
spark.cdm.perfops.numParts 32
1313
spark.cdm.perfops.batchSize 1
1414

1515
spark.cdm.autocorrect.missing true

SIT/regression/03_performance/migrate.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ spark.cdm.schema.origin.column.partition.names pk_id
99
spark.cdm.schema.origin.column.types 1,0,0
1010
spark.cdm.schema.target.column.id.names pk_id,ck_id
1111

12-
spark.cdm.perfops.numParts 1
12+
spark.cdm.perfops.numParts 32
1313
spark.cdm.perfops.batchSize 1

src/main/java/datastax/cdm/cql/CqlHelper.java

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,15 @@
2525
public class CqlHelper {
2626
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2727

28-
private CqlSession originSession;
29-
private CqlSession targetSession;
28+
private CqlSession originSessionInit;
29+
private CqlSession targetSessionInit;
3030

3131
private ConsistencyLevel readConsistencyLevel;
3232
private ConsistencyLevel writeConsistencyLevel;
3333

3434
public final PropertyHelper propertyHelper;
3535
private final Map<Featureset, Feature> featureMap = new HashMap<>(Featureset.values().length);
3636
private PKFactory pkFactory;
37-
private OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement;
38-
private OriginSelectByPKStatement originSelectByPKStatement;
39-
private TargetInsertStatement targetInsertStatement;
40-
private TargetUpdateStatement targetUpdateStatement;
41-
private TargetSelectByPKStatement targetSelectByPKStatement;
4237

4338
private final Map<Codecset, TypeCodec<?>> codecMap = new HashMap<>(Codecset.values().length);
4439

@@ -71,22 +66,19 @@ public boolean initialize() {
7166
feature.alterProperties(this.propertyHelper, this.pkFactory);
7267
}
7368

74-
registerTargetCodecs();
69+
registerCodecs(targetSessionInit);
7570

76-
originSelectByPartitionRangeStatement = new OriginSelectByPartitionRangeStatement(propertyHelper,this);
77-
originSelectByPKStatement = new OriginSelectByPKStatement(propertyHelper,this);
78-
targetInsertStatement = new TargetInsertStatement(propertyHelper,this);
79-
targetUpdateStatement = new TargetUpdateStatement(propertyHelper,this);
80-
targetSelectByPKStatement = new TargetSelectByPKStatement(propertyHelper,this);
71+
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = new OriginSelectByPartitionRangeStatement(propertyHelper,this, null);
72+
TargetInsertStatement targetInsertStatement = new TargetInsertStatement(propertyHelper,this, null);
73+
TargetUpdateStatement targetUpdateStatement = new TargetUpdateStatement(propertyHelper,this, null);
74+
TargetSelectByPKStatement targetSelectByPKStatement = new TargetSelectByPKStatement(propertyHelper,this, null);
8175

8276
logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
8377
logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
84-
logger.info("PARAM -- Write Batch Size: {}", getBatchSize());
78+
logger.info("PARAM -- Write Batch Size: {}", getBatchSize(originSelectByPartitionRangeStatement));
8579
logger.info("PARAM -- Read Fetch Size: {}", getFetchSizeInRows());
8680
logger.info("PARAM -- Origin Keyspace Table: {}", ColumnsKeysTypes.getOriginKeyspaceTable(propertyHelper));
8781
logger.info("PARAM -- Target Keyspace Table: {}", ColumnsKeysTypes.getTargetKeyspaceTable(propertyHelper));
88-
logger.info("PARAM -- TTLCols: {}", getTtlCols());
89-
logger.info("PARAM -- WriteTimestampCols: {}", getWriteTimeStampCols());
9082
logger.info("PARAM -- ORIGIN SELECT Query used: {}", originSelectByPartitionRangeStatement.getCQL());
9183
logger.info("PARAM -- TARGET INSERT Query used: {}", targetInsertStatement.getCQL());
9284
logger.info("PARAM -- TARGET UPDATE Query used: {}", targetUpdateStatement.getCQL());
@@ -108,17 +100,17 @@ public Boolean isFeatureEnabled(Featureset featureEnum) {
108100
}
109101

110102
public PKFactory getPKFactory() {return pkFactory;}
111-
public OriginSelectByPartitionRangeStatement getOriginSelectByPartitionRangeStatement() {return originSelectByPartitionRangeStatement;}
112-
public OriginSelectByPKStatement getOriginSelectByPKStatement() {return originSelectByPKStatement;}
113-
public TargetInsertStatement getTargetInsertStatement() {return targetInsertStatement;}
114-
public TargetUpdateStatement getTargetUpdateStatement() {return targetUpdateStatement;}
115-
public TargetSelectByPKStatement getTargetSelectByPKStatement() {return targetSelectByPKStatement;}
103+
public OriginSelectByPartitionRangeStatement getOriginSelectByPartitionRangeStatement(CqlSession session) {return new OriginSelectByPartitionRangeStatement(propertyHelper,this, session);}
104+
public OriginSelectByPKStatement getOriginSelectByPKStatement(CqlSession session) {return new OriginSelectByPKStatement(propertyHelper,this, session);}
105+
public TargetInsertStatement getTargetInsertStatement(CqlSession session) {return new TargetInsertStatement(propertyHelper,this, session);}
106+
public TargetUpdateStatement getTargetUpdateStatement(CqlSession session) {return new TargetUpdateStatement(propertyHelper,this, session);}
107+
public TargetSelectByPKStatement getTargetSelectByPKStatement(CqlSession session) {return new TargetSelectByPKStatement(propertyHelper,this, session);}
116108

117109
// ----------------- Codec Functions --------------
118-
private void registerTargetCodecs() {
110+
public void registerCodecs(CqlSession session) {
119111
List<String> codecList = propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS);
120112
if (null!=codecList && !codecList.isEmpty()) {
121-
MutableCodecRegistry registry = getCodecRegistry();
113+
MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry();
122114

123115
StringBuilder sb = new StringBuilder("PARAM -- Codecs Enabled: ");
124116
for (String codecString : codecList) {
@@ -137,24 +129,22 @@ public boolean isCodecRegistered(Codecset codecEnum) {
137129
}
138130

139131
public MutableCodecRegistry getCodecRegistry() {
140-
return (MutableCodecRegistry) targetSession.getContext().getCodecRegistry();
132+
return (MutableCodecRegistry) targetSessionInit.getContext().getCodecRegistry();
141133
}
142134

143135
// --------------- Session and Performance -------------------------
144-
public void setOriginSession(CqlSession originSession) {
145-
this.originSession = originSession;
136+
// TODO: these should only by used when initializing the system, and should be refactored as part of moving schema definition to its own class
137+
public void setOriginSessionInit(CqlSession originSessionInit) {
138+
this.originSessionInit = originSessionInit;
146139
}
147-
148-
public void setTargetSession(CqlSession targetSession) {
149-
this.targetSession = targetSession;
140+
public void setTargetSessionInit(CqlSession targetSessionInit) {
141+
this.targetSessionInit = targetSessionInit;
150142
}
151-
152-
public CqlSession getOriginSession() {
153-
return originSession;
143+
public CqlSession getOriginSessionInit() {
144+
return originSessionInit;
154145
}
155-
156-
public CqlSession getTargetSession() {
157-
return targetSession;
146+
public CqlSession getTargetSessionInit() {
147+
return targetSessionInit;
158148
}
159149

160150
public ConsistencyLevel getReadConsistencyLevel() {
@@ -168,7 +158,7 @@ public Integer getFetchSizeInRows() {
168158
return propertyHelper.getInteger(KnownProperties.PERF_FETCH_SIZE);
169159
}
170160

171-
public Integer getBatchSize() {
161+
public Integer getBatchSize(OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement) {
172162
// cannot do batching if the writeFilter is greater than 0 or maxWriteTimeStampFilter is less than max long
173163
// do not batch for counters as it adds latency & increases chance of discrepancy
174164
if (originSelectByPartitionRangeStatement.hasWriteTimestampFilter() || isCounterTable())
@@ -189,15 +179,6 @@ public boolean isCounterTable() {
189179
return (null != rtn && rtn.size() > 0);
190180
}
191181

192-
//--------------- TTL & Writetime Feature ---------------
193-
public List<Integer> getTtlCols() {
194-
return propertyHelper.getIntegerList(KnownProperties.ORIGIN_TTL_INDEXES);
195-
}
196-
197-
public List<Integer> getWriteTimeStampCols() {
198-
return propertyHelper.getIntegerList(KnownProperties.ORIGIN_WRITETIME_INDEXES);
199-
}
200-
201182
//----------- General Utilities --------------
202183
public Object getData(MigrateDataType dataType, int index, Row row) {
203184
if (dataType.getTypeClass() == Map.class) {

src/main/java/datastax/cdm/cql/statement/AbstractOriginSelectStatement.java

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package datastax.cdm.cql.statement;
22

3+
import com.datastax.oss.driver.api.core.CqlSession;
34
import com.datastax.oss.driver.api.core.cql.BoundStatement;
45
import com.datastax.oss.driver.api.core.cql.ResultSet;
56
import com.datastax.oss.driver.api.core.cql.Row;
7+
import datastax.cdm.feature.Featureset;
8+
import datastax.cdm.feature.WritetimeTTLColumn;
69
import datastax.cdm.job.MigrateDataType;
710
import datastax.cdm.cql.CqlHelper;
811
import datastax.cdm.data.Record;
@@ -18,26 +21,25 @@
1821
public abstract class AbstractOriginSelectStatement extends BaseCdmStatement {
1922
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2023

21-
protected final List<Integer> writeTimestampIndexes = new ArrayList<>();
22-
protected final List<Integer> ttlIndexes = new ArrayList<>();
24+
private final WritetimeTTLColumn writetimeTTLColumnFeature;
2325

2426
private final Boolean writeTimestampFilterEnabled;
2527
private final Long minWriteTimeStampFilter;
2628
private final Long maxWriteTimeStampFilter;
27-
private final Long customWritetime;
2829

2930
private final Boolean filterColumnEnabled;
3031
private final Integer filterColumnIndex;
3132
private final MigrateDataType filterColumnType;
3233
private final String filterColumnString;
3334

34-
public AbstractOriginSelectStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper) {
35-
super(propertyHelper, cqlHelper);
36-
this.session = cqlHelper.getOriginSession();
35+
public AbstractOriginSelectStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper, CqlSession session) {
36+
super(propertyHelper, cqlHelper, session);
3737

3838
resultColumns.addAll(ColumnsKeysTypes.getOriginColumnNames(propertyHelper));
3939
resultTypes.addAll(ColumnsKeysTypes.getOriginColumnTypes(propertyHelper));
4040

41+
writetimeTTLColumnFeature = (WritetimeTTLColumn) cqlHelper.getFeature(Featureset.WRITETIME_TTL_COLUMN);
42+
4143
minWriteTimeStampFilter = getMinWriteTimeStampFilter();
4244
maxWriteTimeStampFilter = getMaxWriteTimeStampFilter();
4345
writeTimestampFilterEnabled = hasWriteTimestampFilter();
@@ -48,11 +50,6 @@ public AbstractOriginSelectStatement(PropertyHelper propertyHelper, CqlHelper cq
4850
Instant.ofEpochMilli(getMaxWriteTimeStampFilter() / 1000));
4951
}
5052

51-
customWritetime = getCustomWritetime();
52-
if (customWritetime > 0)
53-
logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime,
54-
Instant.ofEpochMilli(customWritetime / 1000));
55-
5653
filterColumnString = getFilterColumnString();
5754
filterColumnIndex = getFilterColumnIndex();
5855
filterColumnType = getFilterColumnType();
@@ -113,45 +110,34 @@ public boolean shouldFilterRecord(Record record) {
113110
return false;
114111
}
115112

116-
public Long getLargestWriteTimeStamp(Row row) {
117-
if (customWritetime > 0) return customWritetime;
118-
if (null==writeTimestampIndexes || writeTimestampIndexes.isEmpty()) return null;
119-
OptionalLong max = writeTimestampIndexes.stream()
120-
.mapToLong(row::getLong)
121-
.filter(Objects::nonNull)
122-
.max();
123-
return max.isPresent() ? max.getAsLong() : null;
124-
}
113+
private String ttlAndWritetimeCols() {
114+
if (!writetimeTTLColumnFeature.isEnabled()) {
115+
return "";
116+
}
125117

126-
public Integer getLargestTTL(Row row) {
127-
if (null==ttlIndexes || ttlIndexes.isEmpty()) return null;
128-
OptionalInt max = ttlIndexes.stream()
129-
.mapToInt(row::getInt)
130-
.filter(Objects::nonNull)
131-
.max();
132-
return max.isPresent() ? max.getAsInt() : null;
133-
}
118+
List<Integer> ttlCols = writetimeTTLColumnFeature.getIntegerList(WritetimeTTLColumn.Property.TTL_INDEXES);
119+
List<Integer> wtCols = writetimeTTLColumnFeature.getIntegerList(WritetimeTTLColumn.Property.WRITETIME_INDEXES);
134120

135-
private String ttlAndWritetimeCols() {
136121
StringBuilder sb = new StringBuilder();
137-
if (null != cqlHelper.getTtlCols()) {
138-
cqlHelper.getTtlCols().forEach(col -> {
122+
if (null != ttlCols) {
123+
ttlCols.forEach(col -> {
139124
String ttlCol = resultColumns.get(col);
140125
String cleanCol = ttlCol.replaceAll("-", "_").replace("\"", "");
141126
sb.append(",TTL(").append(ttlCol).append(") as ttl_").append(cleanCol);
142127
resultColumns.add("ttl_" + cleanCol);
143128
resultTypes.add(new MigrateDataType("1")); // int
144-
ttlIndexes.add(resultColumns.size()-1);
129+
writetimeTTLColumnFeature.addTTLSelectColumnIndex(resultColumns.size()-1);
145130
});
146131
}
147-
if (null != cqlHelper.getWriteTimeStampCols()) {
148-
cqlHelper.getWriteTimeStampCols().forEach(col -> {
132+
133+
if (null != wtCols) {
134+
wtCols.forEach(col -> {
149135
String wtCol = resultColumns.get(col);
150136
String cleanCol = wtCol.replaceAll("-", "_").replace("\"", "");
151137
sb.append(",WRITETIME(").append(wtCol).append(") as writetime_").append(cleanCol);
152138
resultColumns.add("writetime_" + cleanCol);
153139
resultTypes.add(new MigrateDataType("2")); // application using as <long>, though Cassandra uses <timestamp>
154-
writeTimestampIndexes.add(resultColumns.size()-1);
140+
writetimeTTLColumnFeature.addWritetimeSelectColumnIndex(resultColumns.size()-1);
155141
});
156142
}
157143
return sb.toString();
@@ -172,17 +158,14 @@ private Long getMaxWriteTimeStampFilter() {
172158
return propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX);
173159
}
174160
public boolean hasWriteTimestampFilter() {
161+
if (!writetimeTTLColumnFeature.isEnabled()) return false;
162+
if (writetimeTTLColumnFeature.getCustomWritetime() <= 0L &&
163+
null == writetimeTTLColumnFeature.getIntegerList(WritetimeTTLColumn.Property.WRITETIME_INDEXES)) return false;
164+
175165
Long min = getMinWriteTimeStampFilter();
176166
Long max = getMaxWriteTimeStampFilter();
177-
List<Integer> writetimeCols = cqlHelper.getWriteTimeStampCols();
178167
return (null != min && null != max &&
179-
min > 0 && max > 0 && min < max &&
180-
null != writetimeCols && !writetimeCols.isEmpty());
181-
}
182-
183-
private Long getCustomWritetime() {
184-
Long rtn = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME);
185-
return (null==rtn || rtn < 0) ? 0L : rtn;
168+
min > 0 && max > 0 && min < max);
186169
}
187170

188171
private String getFilterColumnString() {

src/main/java/datastax/cdm/cql/statement/AbstractTargetUpsertStatement.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datastax.cdm.cql.statement;
22

3+
import com.datastax.oss.driver.api.core.CqlSession;
34
import com.datastax.oss.driver.api.core.cql.*;
45
import datastax.cdm.job.MigrateDataType;
56
import datastax.cdm.cql.CqlHelper;
@@ -40,15 +41,13 @@ public abstract class AbstractTargetUpsertStatement extends BaseCdmStatement {
4041
protected abstract String buildStatement();
4142
protected abstract BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long writeTime, Object explodeMapKey, Object explodeMapValue);
4243

43-
public AbstractTargetUpsertStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper) {
44-
super(propertyHelper, cqlHelper);
45-
46-
this.session = cqlHelper.getTargetSession();
44+
public AbstractTargetUpsertStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper, CqlSession session) {
45+
super(propertyHelper, cqlHelper, session);
4746

4847
constantColumnFeature = cqlHelper.getFeature(Featureset.CONSTANT_COLUMNS);
4948
explodeMapFeature = cqlHelper.getFeature(Featureset.EXPLODE_MAP);
5049

51-
setTTLAndWriteTimeNames();
50+
setTTLAndWriteTimeBooleans();
5251
setNamesAndTypes();
5352
setConstantColumns();
5453
setExplodeMapIndexes();
@@ -115,13 +114,19 @@ private void setConstantColumns() {
115114
}
116115
}
117116

118-
private void setTTLAndWriteTimeNames() {
119-
List<Integer> ttlColumnNames = propertyHelper.getIntegerList(KnownProperties.ORIGIN_TTL_INDEXES);
120-
usingTTL = null!= ttlColumnNames && !ttlColumnNames.isEmpty();
121-
List<Integer> writeTimeColumnNames = propertyHelper.getIntegerList(KnownProperties.ORIGIN_WRITETIME_INDEXES);
122-
Long customWritetime = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME);
123-
usingWriteTime = (null!= writeTimeColumnNames && !writeTimeColumnNames.isEmpty()
124-
|| null != customWritetime && customWritetime > 0);
117+
private void setTTLAndWriteTimeBooleans() {
118+
usingTTL = false;
119+
usingWriteTime = false;
120+
WritetimeTTLColumn feature = (WritetimeTTLColumn) cqlHelper.getFeature(Featureset.WRITETIME_TTL_COLUMN);
121+
122+
if (FeatureFactory.isEnabled(feature)) {
123+
if (null != feature.getNumberList(WritetimeTTLColumn.Property.TTL_INDEXES))
124+
usingTTL = true;
125+
126+
if (feature.getCustomWritetime() > 0 ||
127+
null != feature.getNumberList(WritetimeTTLColumn.Property.WRITETIME_INDEXES))
128+
usingWriteTime = true;
129+
}
125130
}
126131

127132
private void setCounterIndexes() {

src/main/java/datastax/cdm/cql/statement/BaseCdmStatement.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,17 @@ public class BaseCdmStatement {
2727
protected UDTMapper udtMapper;
2828
protected boolean udtMappingEnabled;
2929

30-
public BaseCdmStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper) {
30+
public BaseCdmStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper, CqlSession session) {
3131
this.propertyHelper = propertyHelper;
3232
this.cqlHelper = cqlHelper;
3333
this.udtMapper = (UDTMapper) cqlHelper.getFeature(Featureset.UDT_MAPPER);
3434
this.udtMappingEnabled = FeatureFactory.isEnabled(this.udtMapper);
35+
this.session = session;
3536
}
3637

3738
public PreparedStatement prepareStatement() {
38-
if (null == session || session.isClosed())
39-
throw new RuntimeException("Session is not ready for use, session=" + session);
39+
if (null==session)
40+
throw new RuntimeException("Session is not set");
4041
if (null == statement || statement.isEmpty())
4142
throw new RuntimeException("Statement is not set");
4243
return session.prepare(statement);

src/main/java/datastax/cdm/cql/statement/OriginSelectByPKStatement.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datastax.cdm.cql.statement;
22

3+
import com.datastax.oss.driver.api.core.CqlSession;
34
import com.datastax.oss.driver.api.core.cql.BoundStatement;
45
import com.datastax.oss.driver.api.core.cql.ResultSet;
56
import com.datastax.oss.driver.api.core.cql.Row;
@@ -19,8 +20,8 @@ public class OriginSelectByPKStatement extends AbstractOriginSelectStatement {
1920

2021
private final List<MigrateDataType> originPKTypes;
2122

22-
public OriginSelectByPKStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper) {
23-
super(propertyHelper, cqlHelper);
23+
public OriginSelectByPKStatement(PropertyHelper propertyHelper, CqlHelper cqlHelper, CqlSession session) {
24+
super(propertyHelper, cqlHelper, session);
2425
originPKTypes = ColumnsKeysTypes.getOriginPKTypes(propertyHelper);
2526
}
2627

0 commit comments

Comments
 (0)