@@ -20,56 +20,82 @@ class AbstractJob extends App {
20
20
val sourceHost = sc.getConf.get(" spark.migrate.source.host" , " " )
21
21
val sourceUsername = sc.getConf.get(" spark.migrate.source.username" )
22
22
val sourcePassword = sc.getConf.get(" spark.migrate.source.password" )
23
- val sourceReadConsistencyLevel = sc.getConf.get(" spark.migrate.source.read.consistency.level" ," LOCAL_QUORUM" )
23
+ val sourceReadConsistencyLevel = sc.getConf.get(" spark.migrate.source.read.consistency.level" , " LOCAL_QUORUM" )
24
+ val sourceTrustStorePath = sc.getConf.get(" spark.migrate.source.trustStore.path" , " " )
25
+ val sourceTrustStorePassword = sc.getConf.get(" spark.migrate.source.trustStore.password" , " " )
26
+ val sourceTrustStoreType = sc.getConf.get(" spark.migrate.source.trustStore.type" , " JKS" )
27
+ val sourceKeyStorePath = sc.getConf.get(" spark.migrate.source.keyStore.path" , " " )
28
+ val sourceKeyStorePassword = sc.getConf.get(" spark.migrate.source.keyStore.password" , " " )
24
29
25
30
val destinationIsAstra = sc.getConf.get(" spark.migrate.destination.isAstra" , " true" )
26
- val destinationScbPath = sc.getConf.get(" spark.migrate.destination.scb" , " " )
27
- val destinationHost = sc.getConf.get(" spark.migrate.destination.host" , " " )
31
+ val destinationScbPath = sc.getConf.get(" spark.migrate.destination.scb" , " " )
32
+ val destinationHost = sc.getConf.get(" spark.migrate.destination.host" , " " )
28
33
val destinationUsername = sc.getConf.get(" spark.migrate.destination.username" )
29
34
val destinationPassword = sc.getConf.get(" spark.migrate.destination.password" )
30
35
val destinationReadConsistencyLevel = sc.getConf.get(" spark.migrate.destination.read.consistency.level" , " LOCAL_QUORUM" )
36
+ val destinationTrustStorePath = sc.getConf.get(" spark.migrate.destination.trustStore.path" , " " )
37
+ val destinationTrustStorePassword = sc.getConf.get(" spark.migrate.destination.trustStore.password" , " " )
38
+ val destinationTrustStoreType = sc.getConf.get(" spark.migrate.destination.trustStore.type" , " JKS" )
39
+ val destinationKeyStorePath = sc.getConf.get(" spark.migrate.destination.keyStore.path" , " " )
40
+ val destinationKeyStorePassword = sc.getConf.get(" spark.migrate.destination.keyStore.password" , " " )
31
41
32
42
val minPartition = new BigInteger (sc.getConf.get(" spark.migrate.source.minPartition" ))
33
43
val maxPartition = new BigInteger (sc.getConf.get(" spark.migrate.source.maxPartition" ))
34
44
35
- val splitSize = sc.getConf.get(" spark.migrate.splitSize" ," 10000" )
36
-
37
- var sourceConnection = CassandraConnector (sc.getConf
38
- .set(" spark.cassandra.connection.host" , sourceHost)
39
- .set(" spark.cassandra.auth.username" , sourceUsername)
40
- .set(" spark.cassandra.auth.password" , sourcePassword)
41
- .set(" spark.cassandra.input.consistency.level" , sourceReadConsistencyLevel))
42
- if (" true" .equals(sourceIsAstra)) {
43
- sourceConnection = CassandraConnector (sc.getConf
44
- .set(" spark.cassandra.connection.config.cloud.path" , sourceScbPath)
45
- .set(" spark.cassandra.auth.username" , sourceUsername)
46
- .set(" spark.cassandra.auth.password" , sourcePassword)
47
- .set(" spark.cassandra.input.consistency.level" , sourceReadConsistencyLevel))
48
- abstractLogger.info(" Connected to Astra source!" );
49
- } else {
50
- abstractLogger.info(" Connected to Cassandra (or DSE) source!" );
51
- }
45
+ val splitSize = sc.getConf.get(" spark.migrate.splitSize" , " 10000" )
52
46
53
- var destinationConnection = CassandraConnector (sc.getConf
54
- .set(" spark.cassandra.connection.host" , destinationHost)
55
- .set(" spark.cassandra.auth.username" , destinationUsername)
56
- .set(" spark.cassandra.auth.password" , destinationPassword)
57
- .set(" spark.cassandra.input.consistency.level" , destinationReadConsistencyLevel))
58
- if (" true" .equals(destinationIsAstra)) {
59
- destinationConnection = CassandraConnector (
60
- sc.getConf
61
- .set(" spark.cassandra.connection.config.cloud.path" , destinationScbPath)
62
- .set(" spark.cassandra.auth.username" , destinationUsername)
63
- .set(" spark.cassandra.auth.password" , destinationPassword)
64
- .set(" spark.cassandra.input.consistency.level" , destinationReadConsistencyLevel))
65
- abstractLogger.info(" Connected to Astra destination!" );
66
- } else {
67
- abstractLogger.info(" Connected to Cassandra (or DSE) destination!" );
68
- }
47
+ var sourceConnection = getConnection(true , sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
48
+ sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword);
49
+
50
+ var destinationConnection = getConnection(false , destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationReadConsistencyLevel,
51
+ destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword);
69
52
70
- protected def exitSpark = {
53
+ protected def exitSpark () = {
71
54
spark.stop()
72
55
sys.exit(0 )
73
56
}
74
57
58
+ private def getConnection (isSource : Boolean , isAstra : String , scbPath : String , host : String , username : String , password : String , readConsistencyLevel : String ,
59
+ trustStorePath : String , trustStorePassword : String , trustStoreType : String ,
60
+ keyStorePath : String , keyStorePassword : String ): CassandraConnector = {
61
+ var connType : String = " Source"
62
+ if (! isSource) {
63
+ connType = " Destination"
64
+ }
65
+
66
+ if (" true" .equals(isAstra)) {
67
+ abstractLogger.info(connType + " : Connected to Astra!" );
68
+
69
+ return CassandraConnector (sc.getConf
70
+ .set(" spark.cassandra.auth.username" , username)
71
+ .set(" spark.cassandra.auth.password" , password)
72
+ .set(" spark.cassandra.input.consistency.level" , readConsistencyLevel)
73
+ .set(" spark.cassandra.connection.config.cloud.path" , scbPath))
74
+ } else if (null != trustStorePath && ! trustStorePath.trim.isEmpty) {
75
+ abstractLogger.info(connType + " : Connected to Cassandra (or DSE) with SSL!" );
76
+
77
+ return CassandraConnector (sc.getConf
78
+ .set(" spark.cassandra.auth.username" , username)
79
+ .set(" spark.cassandra.auth.password" , password)
80
+ .set(" spark.cassandra.input.consistency.level" , readConsistencyLevel)
81
+ .set(" spark.cassandra.connection.host" , host)
82
+ .set(" spark.cassandra.connection.ssl.enabled" , " true" )
83
+ .set(" spark.cassandra.connection.ssl.trustStore.password" , trustStorePassword) // "cassandra")
84
+ .set(" spark.cassandra.connection.ssl.trustStore.path" , trustStorePath) // "/Users/pravin.bhat/rootca/dse-truststore.jks")
85
+ .set(" spark.cassandra.connection.ssl.keyStore.password" , keyStorePassword) // "cassandra")
86
+ .set(" spark.cassandra.connection.ssl.keyStore.path" , keyStorePath) // "/Users/pravin.bhat/dse/keystores/local.jks")
87
+ .set(" spark.cassandra.connection.ssl.trustStore.type" , trustStoreType) // "JKS")
88
+ .set(" spark.cassandra.connection.ssl.clientAuth.enabled" , " true" )
89
+ )
90
+ } else {
91
+ abstractLogger.info(connType + " : Connected to Cassandra (or DSE)!" );
92
+
93
+ return CassandraConnector (sc.getConf.set(" spark.cassandra.auth.username" , username)
94
+ .set(" spark.cassandra.auth.password" , password)
95
+ .set(" spark.cassandra.input.consistency.level" , readConsistencyLevel)
96
+ .set(" spark.cassandra.connection.host" , host))
97
+ }
98
+
99
+ }
100
+
75
101
}
0 commit comments