Skip to content

Commit ef4d35a

Browse files
authored
Merge pull request #23 from Ankitp1342/feature/ssl-support
Implemented SSL & Client-Auth support (SSL is optional)
2 parents f1451a6 + 6ae387b commit ef4d35a

File tree

3 files changed

+78
-38
lines changed

3 files changed

+78
-38
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.datastax.spark.example</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.16</version>
6+
<version>0.17</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 63 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,82 @@ class AbstractJob extends App {
2020
val sourceHost = sc.getConf.get("spark.migrate.source.host", "")
2121
val sourceUsername = sc.getConf.get("spark.migrate.source.username")
2222
val sourcePassword = sc.getConf.get("spark.migrate.source.password")
23-
val sourceReadConsistencyLevel = sc.getConf.get("spark.migrate.source.read.consistency.level","LOCAL_QUORUM")
23+
val sourceReadConsistencyLevel = sc.getConf.get("spark.migrate.source.read.consistency.level", "LOCAL_QUORUM")
24+
val sourceTrustStorePath = sc.getConf.get("spark.migrate.source.trustStore.path", "")
25+
val sourceTrustStorePassword = sc.getConf.get("spark.migrate.source.trustStore.password", "")
26+
val sourceTrustStoreType = sc.getConf.get("spark.migrate.source.trustStore.type", "JKS")
27+
val sourceKeyStorePath = sc.getConf.get("spark.migrate.source.keyStore.path", "")
28+
val sourceKeyStorePassword = sc.getConf.get("spark.migrate.source.keyStore.password", "")
2429

2530
val destinationIsAstra = sc.getConf.get("spark.migrate.destination.isAstra", "true")
26-
val destinationScbPath = sc.getConf.get("spark.migrate.destination.scb", "")
27-
val destinationHost = sc.getConf.get("spark.migrate.destination.host", "")
31+
val destinationScbPath = sc.getConf.get("spark.migrate.destination.scb", "")
32+
val destinationHost = sc.getConf.get("spark.migrate.destination.host", "")
2833
val destinationUsername = sc.getConf.get("spark.migrate.destination.username")
2934
val destinationPassword = sc.getConf.get("spark.migrate.destination.password")
3035
val destinationReadConsistencyLevel = sc.getConf.get("spark.migrate.destination.read.consistency.level", "LOCAL_QUORUM")
36+
val destinationTrustStorePath = sc.getConf.get("spark.migrate.destination.trustStore.path", "")
37+
val destinationTrustStorePassword = sc.getConf.get("spark.migrate.destination.trustStore.password", "")
38+
val destinationTrustStoreType = sc.getConf.get("spark.migrate.destination.trustStore.type", "JKS")
39+
val destinationKeyStorePath = sc.getConf.get("spark.migrate.destination.keyStore.path", "")
40+
val destinationKeyStorePassword = sc.getConf.get("spark.migrate.destination.keyStore.password", "")
3141

3242
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
3343
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
3444

35-
val splitSize = sc.getConf.get("spark.migrate.splitSize","10000")
36-
37-
var sourceConnection = CassandraConnector(sc.getConf
38-
.set("spark.cassandra.connection.host", sourceHost)
39-
.set("spark.cassandra.auth.username", sourceUsername)
40-
.set("spark.cassandra.auth.password", sourcePassword)
41-
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel))
42-
if ("true".equals(sourceIsAstra)) {
43-
sourceConnection = CassandraConnector(sc.getConf
44-
.set("spark.cassandra.connection.config.cloud.path", sourceScbPath)
45-
.set("spark.cassandra.auth.username", sourceUsername)
46-
.set("spark.cassandra.auth.password", sourcePassword)
47-
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel))
48-
abstractLogger.info("Connected to Astra source!");
49-
} else {
50-
abstractLogger.info("Connected to Cassandra (or DSE) source!");
51-
}
45+
val splitSize = sc.getConf.get("spark.migrate.splitSize", "10000")
5246

53-
var destinationConnection = CassandraConnector(sc.getConf
54-
.set("spark.cassandra.connection.host", destinationHost)
55-
.set("spark.cassandra.auth.username", destinationUsername)
56-
.set("spark.cassandra.auth.password", destinationPassword)
57-
.set("spark.cassandra.input.consistency.level", destinationReadConsistencyLevel))
58-
if ("true".equals(destinationIsAstra)) {
59-
destinationConnection = CassandraConnector(
60-
sc.getConf
61-
.set("spark.cassandra.connection.config.cloud.path", destinationScbPath)
62-
.set("spark.cassandra.auth.username", destinationUsername)
63-
.set("spark.cassandra.auth.password", destinationPassword)
64-
.set("spark.cassandra.input.consistency.level", destinationReadConsistencyLevel))
65-
abstractLogger.info("Connected to Astra destination!");
66-
} else {
67-
abstractLogger.info("Connected to Cassandra (or DSE) destination!");
68-
}
47+
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
48+
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword);
49+
50+
var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationReadConsistencyLevel,
51+
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword);
6952

70-
protected def exitSpark = {
53+
protected def exitSpark() = {
7154
spark.stop()
7255
sys.exit(0)
7356
}
7457

58+
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
59+
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
60+
keyStorePath: String, keyStorePassword: String): CassandraConnector = {
61+
var connType: String = "Source"
62+
if (!isSource) {
63+
connType = "Destination"
64+
}
65+
66+
if ("true".equals(isAstra)) {
67+
abstractLogger.info(connType + ": Connected to Astra!");
68+
69+
return CassandraConnector(sc.getConf
70+
.set("spark.cassandra.auth.username", username)
71+
.set("spark.cassandra.auth.password", password)
72+
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
73+
.set("spark.cassandra.connection.config.cloud.path", scbPath))
74+
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
75+
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL!");
76+
77+
return CassandraConnector(sc.getConf
78+
.set("spark.cassandra.auth.username", username)
79+
.set("spark.cassandra.auth.password", password)
80+
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
81+
.set("spark.cassandra.connection.host", host)
82+
.set("spark.cassandra.connection.ssl.enabled", "true")
83+
.set("spark.cassandra.connection.ssl.trustStore.password", trustStorePassword)
84+
.set("spark.cassandra.connection.ssl.trustStore.path", trustStorePath)
85+
.set("spark.cassandra.connection.ssl.keyStore.password", keyStorePassword)
86+
.set("spark.cassandra.connection.ssl.keyStore.path", keyStorePath)
87+
.set("spark.cassandra.connection.ssl.trustStore.type", trustStoreType)
88+
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
89+
)
90+
} else {
91+
abstractLogger.info(connType + ": Connected to Cassandra (or DSE)!");
92+
93+
return CassandraConnector(sc.getConf.set("spark.cassandra.auth.username", username)
94+
.set("spark.cassandra.auth.password", password)
95+
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
96+
.set("spark.cassandra.connection.host", host))
97+
}
98+
99+
}
100+
75101
}

src/resources/sparkConf.properties

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,20 @@ spark.migrate.source.writeTimeStampFilter.cols 4,5
4343
spark.migrate.source.minWriteTimeStampFilter 0
4444
spark.migrate.source.maxWriteTimeStampFilter 9223372036854775807
4545

46+
########################## ONLY USE if SSL clientAuth is enabled on source Cassandra/DSE ###############################
47+
#spark.migrate.source.trustStore.path
48+
#spark.migrate.source.trustStore.password
49+
#spark.migrate.source.trustStore.type JKS
50+
#spark.migrate.source.keyStore.path
51+
#spark.migrate.source.keyStore.password
52+
53+
####################### ONLY USE if SSL clientAuth is enabled on destination Cassandra/DSE #############################
54+
#spark.migrate.destination.trustStore.path
55+
#spark.migrate.destination.trustStore.password
56+
#spark.migrate.destination.trustStore.type JKS
57+
#spark.migrate.destination.keyStore.path
58+
#spark.migrate.destination.keyStore.password
59+
4660
########################################################################################################################
4761
# Following are the supported data types and their corresponding [Cassandra data-types]
4862
# 0: String [ascii, text, varchar]

0 commit comments

Comments
 (0)