Skip to content

Commit 13f9712

Browse files
committed
Support Azure/cosmos compatibility (needed SSL enable support)
1 parent 8d73c1f commit 13f9712

File tree

4 files changed

+16
-5
lines changed

4 files changed

+16
-5
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>3.0.5</version>
6+
<version>3.0.6</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ class AbstractJob extends BaseJob {
99
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
1010
abstractLogger.info("PARAM -- Split Size: " + splitSize)
1111
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
12+
abstractLogger.info("PARAM -- Origin SSL Enabled: {}", sourceSSLEnabled);
13+
abstractLogger.info("PARAM -- Target SSL Enabled: {}", destinationSSLEnabled);
1214

13-
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
15+
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceSSLEnabled,
1416
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
1517

16-
var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword,
18+
var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationSSLEnabled,
1719
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
1820

1921
private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String,
20-
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
22+
sslEnabled: String, trustStorePath: String, trustStorePassword: String, trustStoreType: String,
2123
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
2224
var connType: String = "Source"
2325
if (!isSource) {
@@ -34,7 +36,7 @@ class AbstractJob extends BaseJob {
3436
.set("spark.cassandra.input.consistency.level", consistencyLevel)
3537
.set("spark.cassandra.connection.config.cloud.path", scbPath))
3638
} else if (trustStorePath.nonEmpty) {
37-
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) with SSL host: " + host);
39+
abstractLogger.info(connType + ": Connecting (with clientAuth) to Cassandra (or DSE) host: " + host);
3840

3941
// Use defaults when not provided
4042
var enabledAlgorithmsVar = enabledAlgorithms
@@ -60,6 +62,7 @@ class AbstractJob extends BaseJob {
6062
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) host: " + host);
6163

6264
return CassandraConnector(config.set("spark.cassandra.auth.username", username)
65+
.set("spark.cassandra.connection.ssl.enabled", sslEnabled)
6366
.set("spark.cassandra.auth.password", password)
6467
.set("spark.cassandra.input.consistency.level", consistencyLevel)
6568
.set("spark.cassandra.connection.host", host))

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class BaseJob extends App {
2424
val sourceHost = Util.getSparkPropOrEmpty(sc, "spark.origin.host")
2525
val sourceUsername = Util.getSparkPropOrEmpty(sc, "spark.origin.username")
2626
val sourcePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.password")
27+
val sourceSSLEnabled = Util.getSparkPropOr(sc, "spark.origin.ssl.enabled", "false")
2728
val sourceTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.path")
2829
val sourceTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.password")
2930
val sourceTrustStoreType = Util.getSparkPropOr(sc, "spark.origin.trustStore.type", "JKS")
@@ -35,6 +36,7 @@ class BaseJob extends App {
3536
val destinationHost = Util.getSparkPropOrEmpty(sc, "spark.target.host")
3637
val destinationUsername = Util.getSparkProp(sc, "spark.target.username")
3738
val destinationPassword = Util.getSparkProp(sc, "spark.target.password")
39+
val destinationSSLEnabled = Util.getSparkPropOr(sc, "spark.target.ssl.enabled", "false")
3840
val destinationTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.path")
3941
val destinationTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.password")
4042
val destinationTrustStoreType = Util.getSparkPropOr(sc, "spark.target.trustStore.type", "JKS")

src/resources/sparkConf.properties

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ spark.query.writetime.cols 2,3
114114
#spark.origin.FilterColumnType 6%16
115115
#spark.origin.FilterColumnValue test
116116

117+
# ONLY USE if SSL is enabled on origin Cassandra/DSE
118+
#spark.origin.ssl.enabled true
119+
117120
# ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE
118121
#spark.origin.trustStore.path
119122
#spark.origin.trustStore.password
@@ -122,6 +125,9 @@ spark.query.writetime.cols 2,3
122125
#spark.origin.keyStore.password
123126
#spark.origin.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
124127

128+
# ONLY USE if SSL is enabled on target Cassandra/DSE
129+
#spark.target.ssl.enabled true
130+
125131
# ONLY USE if SSL clientAuth is enabled on target Cassandra/DSE
126132
#spark.target.trustStore.path
127133
#spark.target.trustStore.password

0 commit comments

Comments
 (0)