Skip to content

Commit d9df9ab

Browse files
committed
Merge remote-tracking branch 'origin/issue/CDM-18' into issue/97
2 parents 42e2553 + d6d692d commit d9df9ab

File tree

4 files changed

+55
-77
lines changed

4 files changed

+55
-77
lines changed

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,41 +48,42 @@ public class OriginCountJobSession extends BaseJobSession {
4848
protected OriginCountJobSession(CqlSession originSession, SparkConf sc) {
4949
super(sc);
5050
// this.originSessionSession = originSession;
51-
// batchSize = new Integer(sc.get(KnownProperties.SPARK_BATCH_SIZE, "1"));
52-
// printStatsAfter = new Integer(sc.get(KnownProperties.SPARK_STATS_AFTER, "100000"));
53-
// if (printStatsAfter < 1) {
54-
// printStatsAfter = 100000;
51+
// batchSize = propertyHelper.getInteger(KnownProperties.SPARK_BATCH_SIZE);
52+
// printStatsAfter = propertyHelper.getInteger(KnownProperties.SPARK_STATS_AFTER);
53+
// if (!propertyHelper.meetsMinimum(KnownProperties.SPARK_STATS_AFTER, printStatsAfter, 1)) {
54+
// logger.warn(KnownProperties.SPARK_STATS_AFTER +" must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.SPARK_STATS_AFTER));
55+
// propertyHelper.setProperty(KnownProperties.SPARK_STATS_AFTER, KnownProperties.getDefault(KnownProperties.SPARK_STATS_AFTER));
56+
// printStatsAfter = propertyHelper.getInteger(KnownProperties.SPARK_STATS_AFTER);
5557
// }
5658
//
57-
// readLimiter = RateLimiter.create(new Integer(sc.get(KnownProperties.SPARK_LIMIT_READ, "20000")));
58-
// originKeyspaceTable = sc.get(KnownProperties.ORIGIN_KEYSPACE_TABLE);
59+
// readLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.SPARK_LIMIT_READ));
60+
// originKeyspaceTable = propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);
5961
//
60-
// hasRandomPartitioner = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER, "false"));
61-
// isCounterTable = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_IS_COUNTER, "false"));
62+
// hasRandomPartitioner = propertyHelper.getBoolean(KnownProperties.ORIGIN_HAS_RANDOM_PARTITIONER);
63+
// isCounterTable = propertyHelper.getBoolean(KnownProperties.ORIGIN_IS_COUNTER);
6264
//
63-
// checkTableforColSize = Boolean.parseBoolean(sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_ENABLED, "false"));
64-
// checkTableforselectCols = sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_NAMES);
65-
// checkTableforColSizeTypes = getTypes(sc.get(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_TYPES));
66-
// filterColName = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_NAME);
67-
// filterColType = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_FILTER_COLUMN_TYPE);
68-
// filterColIndex = Integer.parseInt(sc.get(KnownProperties.ORIGIN_FILTER_COLUMN_INDEX, "0"));
69-
// fieldGuardraillimitMB = Integer.parseInt(sc.get(KnownProperties.FIELD_GUARDRAIL_MB, "0"));
65+
// checkTableforColSize = propertyHelper.getBoolean(KnownProperties.ORIGIN_CHECK_COLSIZE_ENABLED);
66+
// checkTableforselectCols = propertyHelper.getAsString(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_NAMES);
67+
// checkTableforColSizeTypes = getTypes(propertyHelper.getAsString(KnownProperties.ORIGIN_CHECK_COLSIZE_COLUMN_TYPES));
68+
// filterColName = propertyHelper.getAsString(KnownProperties.ORIGIN_FILTER_COLUMN_NAME);
69+
// filterColType = propertyHelper.getAsString(KnownProperties.ORIGIN_FILTER_COLUMN_TYPE);
70+
// filterColIndex = propertyHelper.getInteger(KnownProperties.ORIGIN_FILTER_COLUMN_INDEX);
71+
// fieldGuardraillimitMB = propertyHelper.getInteger(KnownProperties.FIELD_GUARDRAIL_MB);
7072
//
71-
// String partionKey = sc.get(KnownProperties.ORIGIN_PARTITION_KEY);
72-
// idColTypes = getTypes(sc.get(KnownProperties.TARGET_PRIMARY_KEY_TYPES));
73+
// String partionKey = propertyHelper.getAsString(KnownProperties.ORIGIN_PARTITION_KEY);
74+
// idColTypes = getTypes(propertyHelper.getAsString(KnownProperties.TARGET_PRIMARY_KEY_TYPES));
7375
//
74-
// String selectCols = sc.get(KnownProperties.ORIGIN_COLUMN_NAMES);
75-
// String updateSelectMappingStr = sc.get(KnownProperties.ORIGIN_COUNTER_INDEXES, "0");
76+
// String selectCols = propertyHelper.getAsString(KnownProperties.ORIGIN_COLUMN_NAMES);
77+
// String updateSelectMappingStr = propertyHelper.getAsString(KnownProperties.ORIGIN_COUNTER_INDEXES);
7678
// for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
7779
// updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
7880
// }
79-
// String originSelectCondition = sc.get(KnownProperties.ORIGIN_FILTER_CONDITION, "");
81+
// String originSelectCondition = propertyHelper.getAsString(KnownProperties.ORIGIN_FILTER_CONDITION);
8082
// // TODO: AbstractJobSession has some checks to ensure AND is added to the condition
8183
// originSelectStatement = originSession.prepare(
8284
// "select " + selectCols + " from " + originKeyspaceTable + " where token(" + partionKey.trim()
8385
// + ") >= ? and token(" + partionKey.trim() + ") <= ? " + originSelectCondition + " ALLOW FILTERING");
8486
}
85-
//
8687
// public static OriginCountJobSession getInstance(CqlSession originSession, SparkConf sparkConf) {
8788
// if (originCountJobSession == null) {
8889
// synchronized (OriginCountJobSession.class) {

src/main/java/datastax/astra/migrate/Util.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,14 @@
11
package datastax.astra.migrate;
22

33
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4-
import datastax.astra.migrate.properties.KnownProperties;
5-
import datastax.astra.migrate.properties.PropertyHelper;
64
import org.apache.commons.lang.StringUtils;
7-
import org.apache.spark.SparkConf;
85

96
import java.io.BufferedReader;
107
import java.io.FileNotFoundException;
118
import java.io.FileReader;
12-
import java.util.NoSuchElementException;
139

1410
public class Util {
1511

16-
public static String getSparkProp(SparkConf sc, String prop) {
17-
String retVal = PropertyHelper.getInstance(sc).getAsString(prop);
18-
if (StringUtils.isEmpty(retVal) && (prop.contains("origin") || prop.contains("target"))) {
19-
retVal = PropertyHelper.getInstance(sc).getAsString(prop.replace("origin", "source").replace("target", "destination"));
20-
}
21-
if (!KnownProperties.isKnown(prop)) {
22-
throw new IllegalArgumentException("Unknown property: " + prop + "; this is a bug in the code: the property is not configured in KnownProperties.java");
23-
}
24-
return retVal;
25-
}
26-
27-
public static String getSparkPropOr(SparkConf sc, String prop, String defaultVal) {
28-
String retVal = getSparkProp(sc,prop);
29-
return StringUtils.isEmpty(retVal) ? defaultVal : retVal;
30-
}
31-
32-
public static String getSparkPropOrEmpty(SparkConf sc, String prop) {
33-
return getSparkPropOr(sc, prop, "");
34-
}
35-
3612
public static BufferedReader getfileReader(String fileName) {
3713
try {
3814
return new BufferedReader(new FileReader(fileName));

src/main/java/datastax/astra/migrate/properties/KnownProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public enum PropertyType {
196196
types.put(SPARK_STATS_AFTER, PropertyType.NUMBER);
197197
defaults.put(SPARK_STATS_AFTER, "100000");
198198
types.put(FIELD_GUARDRAIL_MB, PropertyType.NUMBER);
199-
defaults.put(FIELD_GUARDRAIL_MB, "10");
199+
defaults.put(FIELD_GUARDRAIL_MB, "0");
200200
types.put(PARTITION_MIN, PropertyType.NUMBER);
201201
defaults.put(PARTITION_MIN, "-9223372036854775808");
202202
types.put(PARTITION_MAX, PropertyType.NUMBER);

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package datastax.astra.migrate
22

3-
import datastax.astra.migrate.properties.KnownProperties
3+
import datastax.astra.migrate.properties.{KnownProperties, PropertyHelper}
44
import org.apache.spark.sql.SparkSession
55
import org.slf4j.LoggerFactory
66

@@ -18,40 +18,41 @@ class BaseJob extends App {
1818

1919
val sContext = spark.sparkContext
2020
val sc = sContext.getConf
21+
val propertyHelper = PropertyHelper.getInstance(sc);
2122

22-
val consistencyLevel = Util.getSparkPropOr(sc, KnownProperties.READ_CL, "LOCAL_QUORUM")
23+
val consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
2324

24-
val originScbPath = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_CONNECT_SCB)
25-
val originHost = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_CONNECT_HOST)
26-
val originPort = Util.getSparkPropOr(sc, KnownProperties.ORIGIN_CONNECT_PORT, "9042")
27-
val originUsername = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_CONNECT_USERNAME)
28-
val originPassword = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_CONNECT_PASSWORD)
29-
val originSSLEnabled = Util.getSparkPropOr(sc, KnownProperties.ORIGIN_TLS_ENABLED, "false")
30-
val originTrustStorePath = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_TLS_TRUSTSTORE_PATH)
31-
val originTrustStorePassword = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_TLS_TRUSTSTORE_PASSWORD)
32-
val originTrustStoreType = Util.getSparkPropOr(sc, KnownProperties.ORIGIN_TLS_TRUSTSTORE_TYPE, "JKS")
33-
val originKeyStorePath = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_TLS_KEYSTORE_PATH)
34-
val originKeyStorePassword = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_TLS_KEYSTORE_PASSWORD)
35-
val originEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, KnownProperties.ORIGIN_TLS_ALGORITHMS)
25+
val originScbPath = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_SCB)
26+
val originHost = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_HOST)
27+
val originPort = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_PORT)
28+
val originUsername = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_USERNAME)
29+
val originPassword = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_PASSWORD)
30+
val originSSLEnabled = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_ENABLED)
31+
val originTrustStorePath = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_PATH)
32+
val originTrustStorePassword = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_PASSWORD)
33+
val originTrustStoreType = propertyHelper.getString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_TYPE)
34+
val originKeyStorePath = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_KEYSTORE_PATH)
35+
val originKeyStorePassword = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_KEYSTORE_PASSWORD)
36+
val originEnabledAlgorithms = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_ALGORITHMS)
3637

37-
val targetScbPath = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_CONNECT_SCB)
38-
val targetHost = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_CONNECT_HOST)
39-
val targetPort = Util.getSparkPropOr(sc, KnownProperties.TARGET_CONNECT_PORT, "9042")
40-
val targetUsername = Util.getSparkProp(sc, KnownProperties.TARGET_CONNECT_USERNAME)
41-
val targetPassword = Util.getSparkProp(sc, KnownProperties.TARGET_CONNECT_PASSWORD)
42-
val targetSSLEnabled = Util.getSparkPropOr(sc, KnownProperties.TARGET_TLS_ENABLED, "false")
43-
val targetTrustStorePath = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_TLS_TRUSTSTORE_PATH)
44-
val targetTrustStorePassword = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_TLS_TRUSTSTORE_PASSWORD)
45-
val targetTrustStoreType = Util.getSparkPropOr(sc, KnownProperties.TARGET_TLS_TRUSTSTORE_TYPE, "JKS")
46-
val targetKeyStorePath = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_TLS_KEYSTORE_PATH)
47-
val targetKeyStorePassword = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_TLS_KEYSTORE_PASSWORD)
48-
val targetEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, KnownProperties.TARGET_TLS_ALGORITHMS)
38+
val targetScbPath = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_SCB)
39+
val targetHost = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_HOST)
40+
val targetPort = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_PORT)
41+
val targetUsername = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_USERNAME)
42+
val targetPassword = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_PASSWORD)
43+
val targetSSLEnabled = propertyHelper.getAsString(KnownProperties.TARGET_TLS_ENABLED)
44+
val targetTrustStorePath = propertyHelper.getAsString(KnownProperties.TARGET_TLS_TRUSTSTORE_PATH)
45+
val targetTrustStorePassword = propertyHelper.getAsString(KnownProperties.TARGET_TLS_TRUSTSTORE_PASSWORD)
46+
val targetTrustStoreType = propertyHelper.getString(KnownProperties.TARGET_TLS_TRUSTSTORE_TYPE)
47+
val targetKeyStorePath = propertyHelper.getAsString(KnownProperties.TARGET_TLS_KEYSTORE_PATH)
48+
val targetKeyStorePassword = propertyHelper.getAsString(KnownProperties.TARGET_TLS_KEYSTORE_PASSWORD)
49+
val targetEnabledAlgorithms = propertyHelper.getAsString(KnownProperties.TARGET_TLS_ALGORITHMS)
4950

50-
val minPartition = new BigInteger(Util.getSparkPropOr(sc, KnownProperties.PARTITION_MIN, "-9223372036854775808"))
51-
val maxPartition = new BigInteger(Util.getSparkPropOr(sc, KnownProperties.PARTITION_MAX, "9223372036854775807"))
52-
val coveragePercent = Util.getSparkPropOr(sc, KnownProperties.ORIGIN_COVERAGE_PERCENT, "100")
53-
val splitSizeBackwardCompatibility = Util.getSparkPropOr(sc, KnownProperties.DEPRECATED_SPARK_SPLIT_SIZE, "10000")
54-
val numSplits = Integer.parseInt(Util.getSparkPropOr(sc, KnownProperties.SPARK_NUM_SPLITS, splitSizeBackwardCompatibility))
51+
val minPartition = new BigInteger(propertyHelper.getAsString(KnownProperties.PARTITION_MIN))
52+
val maxPartition = new BigInteger(propertyHelper.getAsString(KnownProperties.PARTITION_MAX))
53+
val coveragePercent = propertyHelper.getAsString(KnownProperties.ORIGIN_COVERAGE_PERCENT)
54+
val numSplitsFromProperty = propertyHelper.getInteger(KnownProperties.SPARK_NUM_SPLITS)
55+
val numSplits = if (null!=numSplitsFromProperty) numSplitsFromProperty else propertyHelper.getInteger(KnownProperties.DEPRECATED_SPARK_SPLIT_SIZE)
5556

5657
protected def exitSpark() = {
5758
spark.stop()

0 commit comments

Comments
 (0)