Skip to content

Commit ed79962

Browse files
committed
CDM-17 : replacing hard-coded strings with references to KnownProperties statics
1 parent 7452d6d commit ed79962

File tree

7 files changed

+96
-89
lines changed

7 files changed

+96
-89
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
66
import com.datastax.oss.driver.api.core.cql.Row;
77
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import datastax.astra.migrate.properties.KnownProperties;
89
import org.apache.commons.lang.StringUtils;
910
import org.apache.spark.SparkConf;
1011
import org.slf4j.Logger;
@@ -34,54 +35,54 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
3435
this.originSessionSession = originSession;
3536
this.targetSession = targetSession;
3637

37-
batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "5"));
38-
fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
39-
printStatsAfter = new Integer(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
38+
batchSize = new Integer(Util.getSparkPropOr(sc, KnownProperties.SPARK_BATCH_SIZE, "5"));
39+
fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, KnownProperties.READ_FETCH_SIZE, "1000"));
40+
printStatsAfter = new Integer(Util.getSparkPropOr(sc, KnownProperties.SPARK_STATS_AFTER, "100000"));
4041
if (printStatsAfter < 1) {
4142
printStatsAfter = 100000;
4243
}
4344

44-
readLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
45-
writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
46-
maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "0"));
45+
readLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, KnownProperties.SPARK_LIMIT_READ, "20000")));
46+
writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, KnownProperties.SPARK_LIMIT_WRITE, "40000")));
47+
maxRetries = Integer.parseInt(sc.get(KnownProperties.SPARK_MAX_RETRIES, "0"));
4748

48-
originKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
49-
targetKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
49+
originKeyspaceTable = Util.getSparkProp(sc, KnownProperties.ORIGIN_KEYSPACE_TABLE);
50+
targetKeyspaceTable = Util.getSparkProp(sc, KnownProperties.TARGET_KEYSPACE_TABLE);
5051

51-
String ttlColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.ttl.cols");
52+
String ttlColsStr = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_TTL_COLS);
5253
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
5354
for (String ttlCol : ttlColsStr.split(",")) {
5455
ttlCols.add(Integer.parseInt(ttlCol));
5556
}
5657
}
5758

58-
String writeTimestampColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.writetime.cols");
59+
String writeTimestampColsStr = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_WRITETIME_COLS);
5960
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
6061
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
6162
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
6263
}
6364
}
6465

6566
writeTimeStampFilter = Boolean
66-
.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.writeTimeStampFilter", "false"));
67+
.parseBoolean(Util.getSparkPropOr(sc, KnownProperties.ORIGIN_FILTER_WRITETS_ENABLED, "false"));
6768
// batchsize set to 1 if there is a writeFilter
6869
if (writeTimeStampFilter) {
6970
batchSize = 1;
7071
}
7172

7273
String minWriteTimeStampFilterStr =
73-
Util.getSparkPropOr(sc, "spark.origin.minWriteTimeStampFilter", "0");
74+
Util.getSparkPropOr(sc, KnownProperties.ORIGIN_FILTER_WRITETS_MIN, "0");
7475
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr.trim().length() > 1) {
7576
minWriteTimeStampFilter = Long.parseLong(minWriteTimeStampFilterStr);
7677
}
7778
String maxWriteTimeStampFilterStr =
78-
Util.getSparkPropOr(sc, "spark.origin.maxWriteTimeStampFilter", "0");
79+
Util.getSparkPropOr(sc, KnownProperties.ORIGIN_FILTER_WRITETS_MAX, "0");
7980
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
8081
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
8182
}
8283

8384
String customWriteTimeStr =
84-
Util.getSparkPropOr(sc, "spark.target.custom.writeTime", "0");
85+
Util.getSparkPropOr(sc, KnownProperties.TARGET_CUSTOM_WRITETIME, "0");
8586
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
8687
customWritetime = Long.parseLong(customWriteTimeStr);
8788
}
@@ -105,9 +106,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
105106
Instant.ofEpochMilli(maxWriteTimeStampFilter / 1000));
106107
}
107108

108-
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
109-
String partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
110-
String originSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
109+
String selectCols = Util.getSparkProp(sc, KnownProperties.ORIGIN_COLUMN_NAMES);
110+
String partitionKey = Util.getSparkProp(sc, KnownProperties.ORIGIN_PARTITION_KEY);
111+
String originSelectCondition = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_CONDITION);
111112
if (!originSelectCondition.isEmpty() && !originSelectCondition.trim().toUpperCase().startsWith("AND")) {
112113
originSelectCondition = " AND " + originSelectCondition;
113114
}
@@ -120,11 +121,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
120121
writeTimeStampCols.forEach(col -> {
121122
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
122123
});
123-
selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
124-
String idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
124+
selectColTypes = getTypes(Util.getSparkProp(sc, KnownProperties.ORIGIN_COLUMN_TYPES));
125+
String idCols = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_PRIMARY_KEY);
125126
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
126127

127-
String insertCols = Util.getSparkPropOrEmpty(sc, "spark.query.target");
128+
String insertCols = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_COLUMN_NAMES);
128129
if (null == insertCols || insertCols.trim().isEmpty()) {
129130
insertCols = selectCols;
130131
}
@@ -152,15 +153,15 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
152153
"select " + insertCols + " from " + targetKeyspaceTable
153154
+ " where " + insertBinds);
154155

155-
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
156-
isCounterTable = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.counterTable", "false"));
156+
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER, "false"));
157+
isCounterTable = Boolean.parseBoolean(Util.getSparkPropOr(sc, KnownProperties.ORIGIN_IS_COUNTER, "false"));
157158
if (isCounterTable) {
158-
String updateSelectMappingStr = Util.getSparkPropOr(sc, "spark.counterTable.cql.index", "0");
159+
String updateSelectMappingStr = Util.getSparkPropOr(sc, KnownProperties.ORIGIN_COUNTER_INDEXES, "0");
159160
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
160161
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
161162
}
162163

163-
String counterTableUpdate = Util.getSparkProp(sc, "spark.counterTable.cql");
164+
String counterTableUpdate = Util.getSparkProp(sc, KnownProperties.ORIGIN_COUNTER_CQL);
164165
targetInsertStatement = targetSession.prepare(counterTableUpdate);
165166
} else {
166167
insertBinds = "";
@@ -185,7 +186,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
185186
}
186187

187188
// Handle rows with blank values for 'timestamp' data-type in primary-key fields
188-
tsReplaceValStr = Util.getSparkPropOr(sc, "spark.target.replace.blankTimestampKeyUsingEpoch", "");
189+
tsReplaceValStr = Util.getSparkPropOr(sc, KnownProperties.TARGET_REPLACE_MISSING_TS, "");
189190
if (!tsReplaceValStr.isEmpty()) {
190191
tsReplaceVal = Long.parseLong(tsReplaceValStr);
191192
}
@@ -283,7 +284,7 @@ protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Cl
283284
if (index < idColTypes.size() && colData == null && dataType == Instant.class) {
284285
if (tsReplaceValStr.isEmpty()) {
285286
logger.error("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
286-
"Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch=\"<fixed-epoch-value>\" " +
287+
"Alternatively rerun the job with --conf "+KnownProperties.TARGET_REPLACE_MISSING_TS+"\"<fixed-epoch-value>\" " +
287288
"option to replace the blanks with a fixed timestamp value", getKey(originRow), allCols[index]);
288289
return Optional.empty();
289290
}

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
66
import com.datastax.oss.driver.api.core.cql.Row;
77
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import datastax.astra.migrate.properties.KnownProperties;
89
import org.apache.spark.SparkConf;
910

1011
import java.util.ArrayList;
@@ -64,8 +65,8 @@ public abstract class BaseJobSession {
6465
protected long tsReplaceVal;
6566

6667
protected BaseJobSession(SparkConf sc) {
67-
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
68-
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
68+
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, KnownProperties.READ_CL));
69+
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, KnownProperties.WRITE_CL));
6970
}
7071

7172
public String getKey(Row originRow) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.*;
5+
import datastax.astra.migrate.properties.KnownProperties;
56
import org.apache.spark.SparkConf;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
@@ -23,11 +24,11 @@ public class CopyJobSession extends AbstractJobSession {
2324

2425
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
2526
super(originSession, targetSession, sc);
26-
filterData = Boolean.parseBoolean(sc.get("spark.origin.FilterData", "false"));
27-
filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn");
28-
filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType");
29-
filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
30-
filterColValue = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnValue");
27+
filterData = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_FILTER_COLUMN_ENABLED, "false"));
28+
filterColName = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_NAME);
29+
filterColType = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_TYPE);
30+
filterColIndex = Integer.parseInt(sc.get(KnownProperties.ORIGIN_FILTER_COLUMN_INDEX, "0"));
31+
filterColValue = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_VALUE);
3132
}
3233

3334
public static CopyJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc) {

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.datastax.oss.driver.api.core.cql.ResultSet;
77
import com.datastax.oss.driver.api.core.cql.Row;
88
import com.datastax.oss.driver.api.core.data.UdtValue;
9+
import datastax.astra.migrate.properties.KnownProperties;
910
import org.apache.spark.SparkConf;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -36,10 +37,10 @@ public class DiffJobSession extends CopyJobSession {
3637
private DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
3738
super(originSession, targetSession, sc);
3839

39-
autoCorrectMissing = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.target.autocorrect.missing", "false"));
40+
autoCorrectMissing = Boolean.parseBoolean(Util.getSparkPropOr(sc, KnownProperties.TARGET_AUTOCORRECT_MISSING, "false"));
4041
logger.info("PARAM -- Autocorrect Missing: {}", autoCorrectMissing);
4142

42-
autoCorrectMismatch = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.target.autocorrect.mismatch", "false"));
43+
autoCorrectMismatch = Boolean.parseBoolean(Util.getSparkPropOr(sc, KnownProperties.TARGET_AUTOCORRECT_MISMATCH, "false"));
4344
logger.info("PARAM -- Autocorrect Mismatch: {}", autoCorrectMismatch);
4445
}
4546

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.*;
55
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
6+
import datastax.astra.migrate.properties.KnownProperties;
67
import org.apache.commons.lang.SerializationUtils;
78
import org.apache.spark.SparkConf;
89
import org.slf4j.Logger;
@@ -29,35 +30,36 @@ public class OriginCountJobSession extends BaseJobSession {
2930
protected OriginCountJobSession(CqlSession originSession, SparkConf sc) {
3031
super(sc);
3132
this.originSessionSession = originSession;
32-
batchSize = new Integer(sc.get("spark.batchSize", "1"));
33-
printStatsAfter = new Integer(sc.get("spark.printStatsAfter", "100000"));
33+
batchSize = new Integer(sc.get(KnownProperties.SPARK_BATCH_SIZE, "1"));
34+
printStatsAfter = new Integer(sc.get(KnownProperties.SPARK_STATS_AFTER, "100000"));
3435
if (printStatsAfter < 1) {
3536
printStatsAfter = 100000;
3637
}
3738

38-
readLimiter = RateLimiter.create(new Integer(sc.get("spark.readRateLimit", "20000")));
39-
originKeyspaceTable = sc.get("spark.origin.keyspaceTable");
39+
readLimiter = RateLimiter.create(new Integer(sc.get(KnownProperties.SPARK_LIMIT_READ, "20000")));
40+
originKeyspaceTable = sc.get(KnownProperties.ORIGIN_KEYSPACE_TABLE);
4041

41-
hasRandomPartitioner = Boolean.parseBoolean(sc.get("spark.origin.hasRandomPartitioner", "false"));
42-
isCounterTable = Boolean.parseBoolean(sc.get("spark.counterTable", "false"));
42+
hasRandomPartitioner = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER, "false"));
43+
isCounterTable = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_IS_COUNTER, "false"));
4344

44-
checkTableforColSize = Boolean.parseBoolean(sc.get("spark.origin.checkTableforColSize", "false"));
45-
checkTableforselectCols = sc.get("spark.origin.checkTableforColSize.cols");
46-
checkTableforColSizeTypes = getTypes(sc.get("spark.origin.checkTableforColSize.cols.types"));
47-
filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn");
48-
filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType");
49-
filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
50-
fieldGuardraillimitMB = Integer.parseInt(sc.get("spark.fieldGuardraillimitMB", "0"));
45+
checkTableforColSize = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_ENABLED, "false"));
46+
checkTableforselectCols = sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_NAMES);
47+
checkTableforColSizeTypes = getTypes(sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_TYPES));
48+
filterColName = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_NAME);
49+
filterColType = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_TYPE);
50+
filterColIndex = Integer.parseInt(sc.get(KnownProperties.ORIGIN_FILTER_COLUMN_INDEX, "0"));
51+
fieldGuardraillimitMB = Integer.parseInt(sc.get(KnownProperties.FIELD_GUARDRAIL_MB, "0"));
5152

52-
String partionKey = sc.get("spark.query.cols.partitionKey");
53-
idColTypes = getTypes(sc.get("spark.query.cols.id.types"));
53+
String partionKey = sc.get(KnownProperties.ORIGIN_PARTITION_KEY);
54+
idColTypes = getTypes(sc.get(KnownProperties.TARGET_PRIMARY_KEY_TYPES));
5455

55-
String selectCols = sc.get("spark.query.cols.select");
56-
String updateSelectMappingStr = sc.get("spark.counterTable.cql.index", "0");
56+
String selectCols = sc.get(KnownProperties.ORIGIN_COLUMN_NAMES);
57+
String updateSelectMappingStr = sc.get(KnownProperties.ORIGIN_COUNTER_INDEXES, "0");
5758
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
5859
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
5960
}
60-
String originSelectCondition = sc.get("spark.query.cols.select.condition", "");
61+
String originSelectCondition = sc.get(KnownProperties.ORIGIN_FILTER_CONDITION, "");
62+
// TODO: AbstractJobSession has some checks to ensure AND is added to the condition
6163
originSelectStatement = originSession.prepare(
6264
"select " + selectCols + " from " + originKeyspaceTable + " where token(" + partionKey.trim()
6365
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + originSelectCondition + " ALLOW FILTERING");

src/main/java/datastax/astra/migrate/properties/KnownProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public enum PropertyType {
174174
public static final String SPARK_MAX_RETRIES = "spark.maxRetries"; // 0
175175
public static final String READ_FETCH_SIZE = "spark.read.fetch.sizeInRows"; //1000
176176
public static final String SPARK_STATS_AFTER = "spark.printStatsAfter"; //100000
177-
public static final String FIELD_GUARDRAIL = "spark.fieldGuardraillimitMB"; //10
177+
public static final String FIELD_GUARDRAIL_MB = "spark.fieldGuardraillimitMB"; //10
178178
public static final String PARTITION_MIN = "spark.origin.minPartition"; // -9223372036854775808
179179
public static final String PARTITION_MAX = "spark.origin.maxPartition"; // 9223372036854775807
180180

@@ -195,8 +195,8 @@ public enum PropertyType {
195195
defaults.put(READ_FETCH_SIZE, "1000");
196196
types.put(SPARK_STATS_AFTER, PropertyType.NUMBER);
197197
defaults.put(SPARK_STATS_AFTER, "100000");
198-
types.put(FIELD_GUARDRAIL, PropertyType.NUMBER);
199-
defaults.put(FIELD_GUARDRAIL, "10");
198+
types.put(FIELD_GUARDRAIL_MB, PropertyType.NUMBER);
199+
defaults.put(FIELD_GUARDRAIL_MB, "10");
200200
types.put(PARTITION_MIN, PropertyType.NUMBER);
201201
defaults.put(PARTITION_MIN, "-9223372036854775808");
202202
types.put(PARTITION_MAX, PropertyType.NUMBER);

0 commit comments

Comments
 (0)