@@ -12,13 +12,13 @@ class AbstractJob extends BaseJob {
12
12
abstractLogger.info(" PARAM -- Origin SSL Enabled: {}" , sourceSSLEnabled);
13
13
abstractLogger.info(" PARAM -- Target SSL Enabled: {}" , destinationSSLEnabled);
14
14
15
- var sourceConnection = getConnection(true , sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceSSLEnabled,
15
+ var sourceConnection = getConnection(true , sourceScbPath, sourceHost, sourcePort, sourceUsername, sourcePassword, sourceSSLEnabled,
16
16
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
17
17
18
- var destinationConnection = getConnection(false , destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationSSLEnabled,
18
+ var destinationConnection = getConnection(false , destinationScbPath, destinationHost, destinationPort, destinationUsername, destinationPassword, destinationSSLEnabled,
19
19
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
20
20
21
- private def getConnection (isSource : Boolean , scbPath : String , host : String , username : String , password : String ,
21
+ private def getConnection (isSource : Boolean , scbPath : String , host : String , port : String , username : String , password : String ,
22
22
sslEnabled : String , trustStorePath : String , trustStorePassword : String , trustStoreType : String ,
23
23
keyStorePath : String , keyStorePassword : String , enabledAlgorithms : String ): CassandraConnector = {
24
24
var connType : String = " Source"
@@ -36,7 +36,7 @@ class AbstractJob extends BaseJob {
36
36
.set(" spark.cassandra.input.consistency.level" , consistencyLevel)
37
37
.set(" spark.cassandra.connection.config.cloud.path" , scbPath))
38
38
} else if (trustStorePath.nonEmpty) {
39
- abstractLogger.info(connType + " : Connecting (with clientAuth) to Cassandra (or DSE) host: " + host);
39
+ abstractLogger.info(connType + " : Connecting (with clientAuth) to Cassandra (or DSE) host:port " + host + " : " + port );
40
40
41
41
// Use defaults when not provided
42
42
var enabledAlgorithmsVar = enabledAlgorithms
@@ -49,6 +49,7 @@ class AbstractJob extends BaseJob {
49
49
.set(" spark.cassandra.auth.password" , password)
50
50
.set(" spark.cassandra.input.consistency.level" , consistencyLevel)
51
51
.set(" spark.cassandra.connection.host" , host)
52
+ .set(" spark.cassandra.connection.port" , port)
52
53
.set(" spark.cassandra.connection.ssl.enabled" , " true" )
53
54
.set(" spark.cassandra.connection.ssl.enabledAlgorithms" , enabledAlgorithmsVar)
54
55
.set(" spark.cassandra.connection.ssl.trustStore.password" , trustStorePassword)
@@ -59,13 +60,14 @@ class AbstractJob extends BaseJob {
59
60
.set(" spark.cassandra.connection.ssl.clientAuth.enabled" , " true" )
60
61
)
61
62
} else {
62
- abstractLogger.info(connType + " : Connecting to Cassandra (or DSE) host: " + host);
63
+ abstractLogger.info(connType + " : Connecting to Cassandra (or DSE) host:port " + host + " : " + port );
63
64
64
65
return CassandraConnector (config.set(" spark.cassandra.auth.username" , username)
65
66
.set(" spark.cassandra.connection.ssl.enabled" , sslEnabled)
66
67
.set(" spark.cassandra.auth.password" , password)
67
68
.set(" spark.cassandra.input.consistency.level" , consistencyLevel)
68
- .set(" spark.cassandra.connection.host" , host))
69
+ .set(" spark.cassandra.connection.host" , host)
70
+ .set(" spark.cassandra.connection.port" , port))
69
71
}
70
72
71
73
}
0 commit comments