Skip to content

Commit 9a52716

Browse files
committed
Updated naming to be ZDM complaint
- Replaced 'source' with 'origin' - Replaced 'destination' with 'target' Note: Naming were changed only in config file & the code changes are to support these new config & backward compatibility. The code also uses variable naming that is not zdm-complaint, those will be updated in a future PR.
1 parent 9fd1e22 commit 9a52716

File tree

12 files changed

+161
-140
lines changed

12 files changed

+161
-140
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.0</version>
6+
<version>2.1</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,57 +20,57 @@ public class AbstractJobSession extends BaseJobSession {
2020

2121
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2222

23-
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
23+
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2424
this.sourceSession = sourceSession;
2525
this.astraSession = astraSession;
2626

27-
batchSize = new Integer(sparkConf.get("spark.batchSize", "1"));
28-
printStatsAfter = new Integer(sparkConf.get("spark.printStatsAfter", "100000"));
27+
batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "1"));
28+
printStatsAfter = new Integer(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
2929
if (printStatsAfter < 1) {
3030
printStatsAfter = 100000;
3131
}
3232

33-
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.readRateLimit", "20000")));
34-
writeLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.writeRateLimit", "40000")));
35-
maxRetries = Integer.parseInt(sparkConf.get("spark.maxRetries", "10"));
33+
readLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
34+
writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
35+
maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "10"));
3636

37-
sourceKeyspaceTable = sparkConf.get("spark.source.keyspaceTable");
38-
astraKeyspaceTable = sparkConf.get("spark.destination.keyspaceTable");
37+
sourceKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
38+
astraKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
3939

40-
String ttlColsStr = sparkConf.get("spark.query.ttl.cols", "");
40+
String ttlColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.ttl.cols");
4141
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
4242
for (String ttlCol : ttlColsStr.split(",")) {
4343
ttlCols.add(Integer.parseInt(ttlCol));
4444
}
4545
}
4646

47-
String writeTimestampColsStr = sparkConf.get("spark.query.writetime.cols", "");
47+
String writeTimestampColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.writetime.cols");
4848
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
4949
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
5050
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
5151
}
5252
}
5353

5454
writeTimeStampFilter = Boolean
55-
.parseBoolean(sparkConf.get("spark.source.writeTimeStampFilter", "false"));
55+
.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.writeTimeStampFilter", "false"));
5656
// batchsize set to 1 if there is a writeFilter
5757
if (writeTimeStampFilter) {
5858
batchSize = 1;
5959
}
6060

6161
String minWriteTimeStampFilterStr =
62-
sparkConf.get("spark.source.minWriteTimeStampFilter", "0");
62+
Util.getSparkPropOr(sc, "spark.origin.minWriteTimeStampFilter", "0");
6363
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr.trim().length() > 1) {
6464
minWriteTimeStampFilter = Long.parseLong(minWriteTimeStampFilterStr);
6565
}
6666
String maxWriteTimeStampFilterStr =
67-
sparkConf.get("spark.source.maxWriteTimeStampFilter", "0");
67+
Util.getSparkPropOr(sc, "spark.origin.maxWriteTimeStampFilter", "0");
6868
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
6969
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
7070
}
7171

7272
String customWriteTimeStr =
73-
sparkConf.get("spark.destination.custom.writeTime", "0");
73+
Util.getSparkPropOr(sc, "spark.target.custom.writeTime", "0");
7474
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
7575
customWritetime = Long.parseLong(customWriteTimeStr);
7676
}
@@ -84,9 +84,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
8484
logger.info("PARAM -- WriteTimestampFilterCols: " + writeTimeStampCols);
8585
logger.info("PARAM -- WriteTimestampFilter: " + writeTimeStampFilter);
8686

87-
String selectCols = sparkConf.get("spark.query.source");
88-
String partionKey = sparkConf.get("spark.query.source.partitionKey");
89-
String sourceSelectCondition = sparkConf.get("spark.query.condition", "");
87+
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
88+
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
89+
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
9090

9191
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
9292
String[] allCols = selectCols.split(",");
@@ -96,16 +96,16 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9696
writeTimeStampCols.forEach(col -> {
9797
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
9898
});
99-
String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols.toString() + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
99+
String fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
100100
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING";
101101
sourceSelectStatement = sourceSession.prepare(fullSelectQuery);
102102
logger.info("PARAM -- Query used: " + fullSelectQuery);
103103

104-
selectColTypes = getTypes(sparkConf.get("spark.query.types"));
105-
String idCols = sparkConf.get("spark.query.destination.id", "");
104+
selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
105+
String idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
106106
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
107107

108-
String insertCols = sparkConf.get("spark.query.destination", "");
108+
String insertCols = Util.getSparkPropOrEmpty(sc, "spark.query.target");
109109
if (null == insertCols || insertCols.trim().isEmpty()) {
110110
insertCols = selectCols;
111111
}
@@ -121,15 +121,15 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
121121
"select " + insertCols + " from " + astraKeyspaceTable
122122
+ " where " + insertBinds);
123123

124-
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
125-
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.counterTable", "false"));
124+
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
125+
isCounterTable = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.counterTable", "false"));
126126
if (isCounterTable) {
127-
String updateSelectMappingStr = sparkConf.get("spark.counterTable.cql.index", "0");
127+
String updateSelectMappingStr = Util.getSparkPropOr(sc, "spark.counterTable.cql.index", "0");
128128
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
129129
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
130130
}
131131

132-
String counterTableUpdate = sparkConf.get("spark.counterTable.cql");
132+
String counterTableUpdate = Util.getSparkProp(sc, "spark.counterTable.cql");
133133
astraInsertStatement = astraSession.prepare(counterTableUpdate);
134134
} else {
135135
insertBinds = "";

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ public class CopyJobSession extends AbstractJobSession {
2020
protected AtomicLong readCounter = new AtomicLong(0);
2121
protected AtomicLong writeCounter = new AtomicLong(0);
2222

23-
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
24-
super(sourceSession, astraSession, sparkConf);
23+
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
24+
super(sourceSession, astraSession, sc);
2525
}
2626

27-
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
27+
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2828
if (copyJobSession == null) {
2929
synchronized (CopyJobSession.class) {
3030
if (copyJobSession == null) {
31-
copyJobSession = new CopyJobSession(sourceSession, astraSession, sparkConf);
31+
copyJobSession = new CopyJobSession(sourceSession, astraSession, sc);
3232
}
3333
}
3434
}

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ public class DiffJobSession extends CopyJobSession {
3131
private AtomicLong validCounter = new AtomicLong(0);
3232
private AtomicLong skippedCounter = new AtomicLong(0);
3333

34-
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
35-
super(sourceSession, astraSession, sparkConf);
34+
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
35+
super(sourceSession, astraSession, sc);
3636

37-
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.missing", "false"));
37+
autoCorrectMissing = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.target.autocorrect.missing", "false"));
3838
logger.info("PARAM -- Autocorrect Missing: " + autoCorrectMissing);
3939

40-
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.mismatch", "false"));
40+
autoCorrectMismatch = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.target.autocorrect.mismatch", "false"));
4141
logger.info("PARAM -- Autocorrect Mismatch: " + autoCorrectMismatch);
4242
}
4343

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package datastax.astra.migrate;
2+
3+
import org.apache.spark.SparkConf;
4+
5+
import java.util.NoSuchElementException;
6+
7+
public class Util {
8+
9+
public static String getSparkProp(SparkConf sc, String prop) {
10+
try {
11+
return sc.get(prop);
12+
} catch (NoSuchElementException nse) {
13+
String newProp = prop.replace("origin", "source").replace("target", "destination");
14+
return sc.get(newProp);
15+
}
16+
}
17+
18+
public static String getSparkPropOr(SparkConf sc, String prop, String defaultVal) {
19+
try {
20+
return sc.get(prop);
21+
} catch (NoSuchElementException nse) {
22+
String newProp = prop.replace("origin", "source").replace("target", "destination");
23+
return sc.get(newProp, defaultVal);
24+
}
25+
}
26+
27+
public static String getSparkPropOrEmpty(SparkConf sc, String prop) {
28+
return getSparkPropOr(sc, prop, "");
29+
}
30+
31+
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class AbstractJob extends BaseJob {
2626
if ("true".equals(isAstra)) {
2727
abstractLogger.info(connType + ": Connected to Astra using SCB: " + scbPath);
2828

29-
return CassandraConnector(sc.getConf
29+
return CassandraConnector(sc
3030
.set("spark.cassandra.auth.username", username)
3131
.set("spark.cassandra.auth.password", password)
3232
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
@@ -40,7 +40,7 @@ class AbstractJob extends BaseJob {
4040
enabledAlgorithmsVar = "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA"
4141
}
4242

43-
return CassandraConnector(sc.getConf
43+
return CassandraConnector(sc
4444
.set("spark.cassandra.auth.username", username)
4545
.set("spark.cassandra.auth.password", password)
4646
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
@@ -57,7 +57,7 @@ class AbstractJob extends BaseJob {
5757
} else {
5858
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) host: " + host);
5959

60-
return CassandraConnector(sc.getConf.set("spark.cassandra.auth.username", username)
60+
return CassandraConnector(sc.set("spark.cassandra.auth.username", username)
6161
.set("spark.cassandra.auth.password", password)
6262
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
6363
.set("spark.cassandra.connection.host", host))

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,38 +15,39 @@ class BaseJob extends App {
1515
abstractLogger.info("############################## Cassandra Data Migrator - Starting ##############################")
1616
abstractLogger.info("################################################################################################")
1717

18-
val sc = spark.sparkContext
19-
20-
val sourceIsAstra = sc.getConf.get("spark.source.isAstra", "false")
21-
val sourceScbPath = sc.getConf.get("spark.source.scb", "")
22-
val sourceHost = sc.getConf.get("spark.source.host", "")
23-
val sourceUsername = sc.getConf.get("spark.source.username", "")
24-
val sourcePassword = sc.getConf.get("spark.source.password", "")
25-
val sourceReadConsistencyLevel = sc.getConf.get("spark.source.read.consistency.level", "LOCAL_QUORUM")
26-
val sourceTrustStorePath = sc.getConf.get("spark.source.trustStore.path", "")
27-
val sourceTrustStorePassword = sc.getConf.get("spark.source.trustStore.password", "")
28-
val sourceTrustStoreType = sc.getConf.get("spark.source.trustStore.type", "JKS")
29-
val sourceKeyStorePath = sc.getConf.get("spark.source.keyStore.path", "")
30-
val sourceKeyStorePassword = sc.getConf.get("spark.source.keyStore.password", "")
31-
val sourceEnabledAlgorithms = sc.getConf.get("spark.source.enabledAlgorithms", "")
32-
33-
val destinationIsAstra = sc.getConf.get("spark.destination.isAstra", "true")
34-
val destinationScbPath = sc.getConf.get("spark.destination.scb", "")
35-
val destinationHost = sc.getConf.get("spark.destination.host", "")
36-
val destinationUsername = sc.getConf.get("spark.destination.username")
37-
val destinationPassword = sc.getConf.get("spark.destination.password")
38-
val destinationReadConsistencyLevel = sc.getConf.get("spark.destination.read.consistency.level", "LOCAL_QUORUM")
39-
val destinationTrustStorePath = sc.getConf.get("spark.destination.trustStore.path", "")
40-
val destinationTrustStorePassword = sc.getConf.get("spark.destination.trustStore.password", "")
41-
val destinationTrustStoreType = sc.getConf.get("spark.destination.trustStore.type", "JKS")
42-
val destinationKeyStorePath = sc.getConf.get("spark.destination.keyStore.path", "")
43-
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
44-
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
45-
46-
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition", "-9223372036854775808"))
47-
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition", "9223372036854775807"))
48-
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
49-
val splitSize = Integer.parseInt(sc.getConf.get("spark.splitSize", "10000"))
18+
val sContext = spark.sparkContext
19+
val sc = sContext.getConf
20+
21+
val sourceIsAstra = Util.getSparkPropOr(sc, "spark.origin.isAstra", "false")
22+
val sourceScbPath = Util.getSparkPropOrEmpty(sc, "spark.origin.scb")
23+
val sourceHost = Util.getSparkPropOrEmpty(sc, "spark.origin.host")
24+
val sourceUsername = Util.getSparkPropOrEmpty(sc, "spark.origin.username")
25+
val sourcePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.password")
26+
val sourceReadConsistencyLevel = Util.getSparkPropOr(sc, "spark.origin.read.consistency.level", "LOCAL_QUORUM")
27+
val sourceTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.path")
28+
val sourceTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.password")
29+
val sourceTrustStoreType = Util.getSparkPropOr(sc, "spark.origin.trustStore.type", "JKS")
30+
val sourceKeyStorePath = Util.getSparkPropOrEmpty(sc, "spark.origin.keyStore.path")
31+
val sourceKeyStorePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.keyStore.password")
32+
val sourceEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, "spark.origin.enabledAlgorithms")
33+
34+
val destinationIsAstra = Util.getSparkPropOr(sc, "spark.target.isAstra", "true")
35+
val destinationScbPath = Util.getSparkPropOrEmpty(sc, "spark.target.scb")
36+
val destinationHost = Util.getSparkPropOrEmpty(sc, "spark.target.host")
37+
val destinationUsername = Util.getSparkProp(sc, "spark.target.username")
38+
val destinationPassword = Util.getSparkProp(sc, "spark.target.password")
39+
val destinationReadConsistencyLevel = Util.getSparkPropOr(sc, "spark.target.read.consistency.level", "LOCAL_QUORUM")
40+
val destinationTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.path")
41+
val destinationTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.password")
42+
val destinationTrustStoreType = Util.getSparkPropOr(sc, "spark.target.trustStore.type", "JKS")
43+
val destinationKeyStorePath = Util.getSparkPropOrEmpty(sc, "spark.target.keyStore.path")
44+
val destinationKeyStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.keyStore.password")
45+
val destinationEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, "spark.target.enabledAlgorithms")
46+
47+
val minPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.minPartition", "-9223372036854775808"))
48+
val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807"))
49+
val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")
50+
val splitSize = Integer.parseInt(Util.getSparkPropOr(sc, "spark.splitSize", "10000"))
5051

5152
protected def exitSpark() = {
5253
spark.stop()

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ object DiffData extends AbstractJob {
1717
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
1818
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
1919
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
20-
val parts = sc.parallelize(partitions.toSeq, partitions.size);
20+
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
2121
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2222

2323
parts.foreach(part => {
2424
sourceConnection.withSessionDo(sourceSession =>
2525
destinationConnection.withSessionDo(destinationSession =>
26-
DiffJobSession.getInstance(sourceSession, destinationSession, sc.getConf)
26+
DiffJobSession.getInstance(sourceSession, destinationSession, sc)
2727
.getDataAndDiff(part.getMin, part.getMax)))
2828
})
2929

30-
DiffJobSession.getInstance(null, null, sc.getConf).printCounts("Job Final");
30+
DiffJobSession.getInstance(null, null, sc).printCounts("Job Final");
3131
}
3232

3333
}

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ object Migrate extends AbstractJob {
1919
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
2020
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
2121
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
22-
val parts = sc.parallelize(partitions.toSeq, partitions.size);
22+
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
2323
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2424

2525
parts.foreach(part => {
2626
sourceConnection.withSessionDo(sourceSession =>
2727
destinationConnection.withSessionDo(destinationSession =>
28-
CopyJobSession.getInstance(sourceSession, destinationSession, sc.getConf)
28+
CopyJobSession.getInstance(sourceSession, destinationSession, sc)
2929
.getDataAndInsert(part.getMin, part.getMax)))
3030
})
3131

0 commit comments

Comments
 (0)