Skip to content

Commit d985c1c

Browse files
committed
Simplified config file & use default min/max
1 parent 8eb105b commit d985c1c

File tree

9 files changed

+225
-233
lines changed

9 files changed

+225
-233
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.0</version>
6+
<version>1.1</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,22 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
5858
this.sourceSession = sourceSession;
5959
this.astraSession = astraSession;
6060

61-
batchSize = new Integer(sparkConf.get("spark.migrate.batchSize", "1"));
62-
printStatsAfter = new Integer(sparkConf.get("spark.migrate.printStatsAfter", "100000"));
61+
batchSize = new Integer(sparkConf.get("spark.batchSize", "1"));
62+
printStatsAfter = new Integer(sparkConf.get("spark.printStatsAfter", "100000"));
6363
if (printStatsAfter < 1) {
6464
printStatsAfter = 100000;
6565
}
6666

67-
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.migrate.readRateLimit", "20000")));
68-
writeLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.migrate.writeRateLimit", "40000")));
69-
maxRetries = Integer.parseInt(sparkConf.get("spark.migrate.maxRetries", "10"));
67+
readLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.readRateLimit", "20000")));
68+
writeLimiter = RateLimiter.create(new Integer(sparkConf.get("spark.writeRateLimit", "40000")));
69+
maxRetries = Integer.parseInt(sparkConf.get("spark.maxRetries", "10"));
7070

71-
sourceKeyspaceTable = sparkConf.get("spark.migrate.source.keyspaceTable");
72-
astraKeyspaceTable = sparkConf.get("spark.migrate.destination.keyspaceTable");
71+
sourceKeyspaceTable = sparkConf.get("spark.source.keyspaceTable");
72+
astraKeyspaceTable = sparkConf.get("spark.destination.keyspaceTable");
7373

74-
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.migrate.preserveTTLWriteTime", "false"));
74+
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.preserveTTLWriteTime", "false"));
7575
if (isPreserveTTLWritetime) {
76-
String ttlColsStr = sparkConf.get("spark.migrate.source.ttl.cols");
76+
String ttlColsStr = sparkConf.get("spark.source.ttl.cols");
7777
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
7878
for (String ttlCol : ttlColsStr.split(",")) {
7979
ttlCols.add(Integer.parseInt(ttlCol));
@@ -82,11 +82,11 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
8282
}
8383

8484
writeTimeStampFilter = Boolean
85-
.parseBoolean(sparkConf.get("spark.migrate.source.writeTimeStampFilter", "false"));
85+
.parseBoolean(sparkConf.get("spark.source.writeTimeStampFilter", "false"));
8686
// batchsize set to 1 if there is a writeFilter
8787
if (writeTimeStampFilter) {
8888
batchSize = 1;
89-
String writeTimestampColsStr = sparkConf.get("spark.migrate.source.writeTimeStampFilter.cols");
89+
String writeTimestampColsStr = sparkConf.get("spark.source.writeTimeStampFilter.cols");
9090
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
9191
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
9292
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));
@@ -95,12 +95,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9595
}
9696

9797
String minWriteTimeStampFilterStr =
98-
sparkConf.get("spark.migrate.source.minWriteTimeStampFilter", "0");
98+
sparkConf.get("spark.source.minWriteTimeStampFilter", "0");
9999
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr.trim().length() > 1) {
100100
minWriteTimeStampFilter = Long.parseLong(minWriteTimeStampFilterStr);
101101
}
102102
String maxWriteTimeStampFilterStr =
103-
sparkConf.get("spark.migrate.source.maxWriteTimeStampFilter", "0");
103+
sparkConf.get("spark.source.maxWriteTimeStampFilter", "0");
104104
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
105105
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
106106
}
@@ -115,18 +115,18 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
115115
logger.info(" DEFAULT -- isPreserveTTLWritetime: " + isPreserveTTLWritetime);
116116
logger.info(" DEFAULT -- TTLCols: " + ttlCols);
117117

118-
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.migrate.source.hasRandomPartitioner", "false"));
118+
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.source.hasRandomPartitioner", "false"));
119119

120-
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.migrate.source.counterTable", "false"));
120+
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.source.counterTable", "false"));
121121

122122
counterDeltaMaxIndex = Integer
123-
.parseInt(sparkConf.get("spark.migrate.source.counterTable.update.max.counter.index", "0"));
123+
.parseInt(sparkConf.get("spark.source.counterTable.update.max.counter.index", "0"));
124124

125-
String partionKey = sparkConf.get("spark.migrate.query.cols.partitionKey");
126-
String idCols = sparkConf.get("spark.migrate.query.cols.id");
127-
idColTypes = getTypes(sparkConf.get("spark.migrate.query.cols.id.types"));
125+
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
126+
String idCols = sparkConf.get("spark.query.cols.id");
127+
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
128128

129-
String selectCols = sparkConf.get("spark.migrate.query.cols.select");
129+
String selectCols = sparkConf.get("spark.query.cols.select");
130130

131131
String idBinds = "";
132132
int count = 1;
@@ -139,7 +139,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
139139
count++;
140140
}
141141

142-
sourceSelectCondition = sparkConf.get("spark.migrate.query.cols.select.condition", "");
142+
sourceSelectCondition = sparkConf.get("spark.query.cols.select.condition", "");
143143
sourceSelectStatement = sourceSession.prepare(
144144
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
145145
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
4040
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
4141
super(sourceSession, astraSession, sparkConf);
4242

43-
String insertCols = sparkConf.get("spark.migrate.query.cols.insert");
44-
insertColTypes = getTypes(sparkConf.get("spark.migrate.query.cols.insert.types"));
43+
String insertCols = sparkConf.get("spark.query.cols.insert");
44+
insertColTypes = getTypes(sparkConf.get("spark.query.cols.insert.types"));
4545
String insertBinds = "";
4646
int count = 1;
4747
for (String str : insertCols.split(",")) {
@@ -54,12 +54,12 @@ protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, Spar
5454
}
5555

5656
if (isCounterTable) {
57-
String updateSelectMappingStr = sparkConf.get("spark.migrate.source.counterTable.update.select.index", "0");
57+
String updateSelectMappingStr = sparkConf.get("spark.source.counterTable.update.select.index", "0");
5858
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
5959
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
6060
}
6161

62-
String counterTableUpdate = sparkConf.get("spark.migrate.source.counterTable.update.cql");
62+
String counterTableUpdate = sparkConf.get("spark.source.counterTable.update.cql");
6363
astraInsertStatement = astraSession.prepare(counterTableUpdate);
6464
} else {
6565
if (isPreserveTTLWritetime) {

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,6 @@
1515
import java.util.stream.IntStream;
1616
import java.util.stream.StreamSupport;
1717

18-
/*
19-
(
20-
data_id text,
21-
cylinder text,
22-
value blob,
23-
PRIMARY KEY (data_id, cylinder)
24-
)
25-
*/
2618
public class DiffJobSession extends CopyJobSession {
2719

2820
public static Logger logger = Logger.getLogger(DiffJobSession.class);
@@ -55,9 +47,9 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
5547
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
5648
super(sourceSession, astraSession, sparkConf);
5749

58-
selectColTypes = getTypes(sparkConf.get("spark.migrate.diff.select.types"));
59-
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.migrate.destination.autocorrect.missing", "false"));
60-
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.migrate.destination.autocorrect.mismatch", "false"));
50+
selectColTypes = getTypes(sparkConf.get("spark.diff.select.types"));
51+
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.missing", "false"));
52+
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.destination.autocorrect.mismatch", "false"));
6153
}
6254

6355
public void getDataAndDiff(BigInteger min, BigInteger max) {

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,36 @@ class AbstractJob extends App {
1616

1717
val sc = spark.sparkContext
1818

19-
val sourceIsAstra = sc.getConf.get("spark.migrate.source.isAstra", "false")
20-
val sourceScbPath = sc.getConf.get("spark.migrate.source.scb", "")
21-
val sourceHost = sc.getConf.get("spark.migrate.source.host", "")
22-
val sourceUsername = sc.getConf.get("spark.migrate.source.username")
23-
val sourcePassword = sc.getConf.get("spark.migrate.source.password")
24-
val sourceReadConsistencyLevel = sc.getConf.get("spark.migrate.source.read.consistency.level", "LOCAL_QUORUM")
25-
val sourceTrustStorePath = sc.getConf.get("spark.migrate.source.trustStore.path", "")
26-
val sourceTrustStorePassword = sc.getConf.get("spark.migrate.source.trustStore.password", "")
27-
val sourceTrustStoreType = sc.getConf.get("spark.migrate.source.trustStore.type", "JKS")
28-
val sourceKeyStorePath = sc.getConf.get("spark.migrate.source.keyStore.path", "")
29-
val sourceKeyStorePassword = sc.getConf.get("spark.migrate.source.keyStore.password", "")
30-
val sourceEnabledAlgorithms = sc.getConf.get("spark.migrate.source.enabledAlgorithms", "")
31-
32-
val destinationIsAstra = sc.getConf.get("spark.migrate.destination.isAstra", "true")
33-
val destinationScbPath = sc.getConf.get("spark.migrate.destination.scb", "")
34-
val destinationHost = sc.getConf.get("spark.migrate.destination.host", "")
35-
val destinationUsername = sc.getConf.get("spark.migrate.destination.username")
36-
val destinationPassword = sc.getConf.get("spark.migrate.destination.password")
37-
val destinationReadConsistencyLevel = sc.getConf.get("spark.migrate.destination.read.consistency.level", "LOCAL_QUORUM")
38-
val destinationTrustStorePath = sc.getConf.get("spark.migrate.destination.trustStore.path", "")
39-
val destinationTrustStorePassword = sc.getConf.get("spark.migrate.destination.trustStore.password", "")
40-
val destinationTrustStoreType = sc.getConf.get("spark.migrate.destination.trustStore.type", "JKS")
41-
val destinationKeyStorePath = sc.getConf.get("spark.migrate.destination.keyStore.path", "")
42-
val destinationKeyStorePassword = sc.getConf.get("spark.migrate.destination.keyStore.password", "")
43-
val destinationEnabledAlgorithms = sc.getConf.get("spark.migrate.destination.enabledAlgorithms", "")
44-
45-
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
46-
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
47-
val coveragePercent = sc.getConf.get("spark.migrate.coveragePercent", "100")
48-
val splitSize = sc.getConf.get("spark.migrate.splitSize", "10000")
19+
val sourceIsAstra = sc.getConf.get("spark.source.isAstra", "false")
20+
val sourceScbPath = sc.getConf.get("spark.source.scb", "")
21+
val sourceHost = sc.getConf.get("spark.source.host", "")
22+
val sourceUsername = sc.getConf.get("spark.source.username", "")
23+
val sourcePassword = sc.getConf.get("spark.source.password", "")
24+
val sourceReadConsistencyLevel = sc.getConf.get("spark.source.read.consistency.level", "LOCAL_QUORUM")
25+
val sourceTrustStorePath = sc.getConf.get("spark.source.trustStore.path", "")
26+
val sourceTrustStorePassword = sc.getConf.get("spark.source.trustStore.password", "")
27+
val sourceTrustStoreType = sc.getConf.get("spark.source.trustStore.type", "JKS")
28+
val sourceKeyStorePath = sc.getConf.get("spark.source.keyStore.path", "")
29+
val sourceKeyStorePassword = sc.getConf.get("spark.source.keyStore.password", "")
30+
val sourceEnabledAlgorithms = sc.getConf.get("spark.source.enabledAlgorithms", "")
31+
32+
val destinationIsAstra = sc.getConf.get("spark.destination.isAstra", "true")
33+
val destinationScbPath = sc.getConf.get("spark.destination.scb", "")
34+
val destinationHost = sc.getConf.get("spark.destination.host", "")
35+
val destinationUsername = sc.getConf.get("spark.destination.username")
36+
val destinationPassword = sc.getConf.get("spark.destination.password")
37+
val destinationReadConsistencyLevel = sc.getConf.get("spark.destination.read.consistency.level", "LOCAL_QUORUM")
38+
val destinationTrustStorePath = sc.getConf.get("spark.destination.trustStore.path", "")
39+
val destinationTrustStorePassword = sc.getConf.get("spark.destination.trustStore.password", "")
40+
val destinationTrustStoreType = sc.getConf.get("spark.destination.trustStore.type", "JKS")
41+
val destinationKeyStorePath = sc.getConf.get("spark.destination.keyStore.path", "")
42+
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
43+
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
44+
45+
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition","-9223372036854775808"))
46+
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition","9223372036854775807"))
47+
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
48+
val splitSize = sc.getConf.get("spark.splitSize", "10000")
4949
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
5050

5151
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
@@ -68,15 +68,15 @@ class AbstractJob extends App {
6868
}
6969

7070
if ("true".equals(isAstra)) {
71-
abstractLogger.info(connType + ": Connected to Astra!");
71+
abstractLogger.info(connType + ": Connected to Astra using SCB: " + scbPath);
7272

7373
return CassandraConnector(sc.getConf
7474
.set("spark.cassandra.auth.username", username)
7575
.set("spark.cassandra.auth.password", password)
7676
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
7777
.set("spark.cassandra.connection.config.cloud.path", scbPath))
7878
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
79-
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL!");
79+
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL host: " + host);
8080

8181
// Use defaults when not provided
8282
var enabledAlgorithmsVar = enabledAlgorithms
@@ -99,7 +99,7 @@ class AbstractJob extends App {
9999
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
100100
)
101101
} else {
102-
abstractLogger.info(connType + ": Connected to Cassandra (or DSE)!");
102+
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) host: " + host);
103103

104104
return CassandraConnector(sc.getConf.set("spark.cassandra.auth.username", username)
105105
.set("spark.cassandra.auth.password", password)

0 commit comments

Comments
 (0)