Skip to content

Commit ecab70f

Browse files
committed
CDM-54 isolating connection to a separate class
1 parent ed5fe2c commit ecab70f

File tree

9 files changed

+125
-107
lines changed

9 files changed

+125
-107
lines changed

src/main/scala/com/datastax/cdm/job/AbstractJob.scala

Lines changed: 0 additions & 76 deletions
This file was deleted.

src/main/scala/com/datastax/cdm/job/BaseJob.scala

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,21 @@ class BaseJob extends App {
2222

2323
val consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
2424

25-
val originScbPath = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_SCB)
26-
val originHost = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_HOST)
27-
val originPort = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_PORT)
28-
val originUsername = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_USERNAME)
29-
val originPassword = propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_PASSWORD)
30-
val originSSLEnabled = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_ENABLED)
31-
val originTrustStorePath = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_PATH)
32-
val originTrustStorePassword = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_PASSWORD)
33-
val originTrustStoreType = propertyHelper.getString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_TYPE)
34-
val originKeyStorePath = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_KEYSTORE_PATH)
35-
val originKeyStorePassword = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_KEYSTORE_PASSWORD)
36-
val originEnabledAlgorithms = propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_ALGORITHMS)
37-
38-
val targetScbPath = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_SCB)
39-
val targetHost = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_HOST)
40-
val targetPort = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_PORT)
41-
val targetUsername = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_USERNAME)
42-
val targetPassword = propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_PASSWORD)
43-
val targetSSLEnabled = propertyHelper.getAsString(KnownProperties.TARGET_TLS_ENABLED)
44-
val targetTrustStorePath = propertyHelper.getAsString(KnownProperties.TARGET_TLS_TRUSTSTORE_PATH)
45-
val targetTrustStorePassword = propertyHelper.getAsString(KnownProperties.TARGET_TLS_TRUSTSTORE_PASSWORD)
46-
val targetTrustStoreType = propertyHelper.getString(KnownProperties.TARGET_TLS_TRUSTSTORE_TYPE)
47-
val targetKeyStorePath = propertyHelper.getAsString(KnownProperties.TARGET_TLS_KEYSTORE_PATH)
48-
val targetKeyStorePassword = propertyHelper.getAsString(KnownProperties.TARGET_TLS_KEYSTORE_PASSWORD)
49-
val targetEnabledAlgorithms = propertyHelper.getAsString(KnownProperties.TARGET_TLS_ALGORITHMS)
50-
5125
val minPartition = new BigInteger(propertyHelper.getAsString(KnownProperties.PARTITION_MIN))
5226
val maxPartition = new BigInteger(propertyHelper.getAsString(KnownProperties.PARTITION_MAX))
5327
val coveragePercent = propertyHelper.getAsString(KnownProperties.TOKEN_COVERAGE_PERCENT)
5428
val numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
5529

30+
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
31+
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
32+
abstractLogger.info("PARAM -- Number of Splits : " + numSplits)
33+
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
34+
35+
// TODO: CDM-31 - add localDC configuration support
36+
private val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper)
37+
var originConnection = connectionFetcher.getConnection("ORIGIN", consistencyLevel)
38+
var targetConnection = connectionFetcher.getConnection("TARGET", consistencyLevel)
39+
5640
protected def exitSpark() = {
5741
spark.stop()
5842
abstractLogger.info("################################################################################################")
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.datastax.cdm.job
2+
3+
case class ConnectionDetails(
4+
scbPath: String,
5+
host: String,
6+
port: String,
7+
username: String,
8+
password: String,
9+
sslEnabled: String,
10+
trustStorePath: String,
11+
trustStorePassword: String,
12+
trustStoreType: String,
13+
keyStorePath: String,
14+
keyStorePassword: String,
15+
enabledAlgorithms: String
16+
)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.datastax.cdm.job
2+
3+
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
4+
import com.datastax.spark.connector.cql.CassandraConnector
5+
import org.apache.spark.{SparkConf, SparkContext}
6+
import org.slf4j.{Logger, LoggerFactory}
7+
8+
class ConnectionFetcher(sparkContext: SparkContext, propertyHelper: PropertyHelper) {
9+
val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)
10+
11+
def getConnectionDetails(side: String): ConnectionDetails = {
12+
if ("ORIGIN".equals(side.toUpperCase)) {
13+
ConnectionDetails(
14+
propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_SCB),
15+
propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_HOST),
16+
propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_PORT),
17+
propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_USERNAME),
18+
propertyHelper.getAsString(KnownProperties.ORIGIN_CONNECT_PASSWORD),
19+
propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_ENABLED),
20+
propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_PATH),
21+
propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_PASSWORD),
22+
propertyHelper.getString(KnownProperties.ORIGIN_TLS_TRUSTSTORE_TYPE),
23+
propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_KEYSTORE_PATH),
24+
propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_KEYSTORE_PASSWORD),
25+
propertyHelper.getAsString(KnownProperties.ORIGIN_TLS_ALGORITHMS)
26+
)
27+
}
28+
else {
29+
ConnectionDetails(
30+
propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_SCB),
31+
propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_HOST),
32+
propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_PORT),
33+
propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_USERNAME),
34+
propertyHelper.getAsString(KnownProperties.TARGET_CONNECT_PASSWORD),
35+
propertyHelper.getAsString(KnownProperties.TARGET_TLS_ENABLED),
36+
propertyHelper.getAsString(KnownProperties.TARGET_TLS_TRUSTSTORE_PATH),
37+
propertyHelper.getAsString(KnownProperties.TARGET_TLS_TRUSTSTORE_PASSWORD),
38+
propertyHelper.getString(KnownProperties.TARGET_TLS_TRUSTSTORE_TYPE),
39+
propertyHelper.getAsString(KnownProperties.TARGET_TLS_KEYSTORE_PATH),
40+
propertyHelper.getAsString(KnownProperties.TARGET_TLS_KEYSTORE_PASSWORD),
41+
propertyHelper.getAsString(KnownProperties.TARGET_TLS_ALGORITHMS)
42+
)
43+
}
44+
}
45+
46+
def getConnection(side: String, consistencyLevel: String): CassandraConnector = {
47+
val connectionDetails = getConnectionDetails(side)
48+
val config: SparkConf = sparkContext.getConf
49+
50+
logger.info("PARAM -- SSL Enabled: "+connectionDetails.sslEnabled);
51+
52+
if (connectionDetails.scbPath.nonEmpty) {
53+
logger.info("Connecting to "+side+" using SCB "+connectionDetails.scbPath);
54+
return CassandraConnector(config
55+
.set("spark.cassandra.auth.username", connectionDetails.username)
56+
.set("spark.cassandra.auth.password", connectionDetails.password)
57+
.set("spark.cassandra.input.consistency.level", consistencyLevel)
58+
.set("spark.cassandra.connection.config.cloud.path", connectionDetails.scbPath))
59+
} else if (connectionDetails.trustStorePath.nonEmpty) {
60+
logger.info("Connecting to "+side+" (with truststore) at "+connectionDetails.host+":"+connectionDetails.port);
61+
62+
// Use defaults when not provided
63+
var enabledAlgorithmsVar = connectionDetails.enabledAlgorithms
64+
if (connectionDetails.enabledAlgorithms == null || connectionDetails.enabledAlgorithms.trim.isEmpty) {
65+
enabledAlgorithmsVar = "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA"
66+
}
67+
68+
return CassandraConnector(config
69+
.set("spark.cassandra.auth.username", connectionDetails.username)
70+
.set("spark.cassandra.auth.password", connectionDetails.password)
71+
.set("spark.cassandra.input.consistency.level", consistencyLevel)
72+
.set("spark.cassandra.connection.host", connectionDetails.host)
73+
.set("spark.cassandra.connection.port", connectionDetails.port)
74+
.set("spark.cassandra.connection.ssl.enabled", "true")
75+
.set("spark.cassandra.connection.ssl.enabledAlgorithms", enabledAlgorithmsVar)
76+
.set("spark.cassandra.connection.ssl.trustStore.password", connectionDetails.trustStorePassword)
77+
.set("spark.cassandra.connection.ssl.trustStore.path", connectionDetails.trustStorePath)
78+
.set("spark.cassandra.connection.ssl.keyStore.password", connectionDetails.keyStorePassword)
79+
.set("spark.cassandra.connection.ssl.keyStore.path", connectionDetails.keyStorePath)
80+
.set("spark.cassandra.connection.ssl.trustStore.type", connectionDetails.trustStoreType)
81+
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
82+
)
83+
} else {
84+
logger.info("Connecting to "+side+" at "+connectionDetails.host+":"+connectionDetails.port);
85+
86+
return CassandraConnector(config.set("spark.cassandra.auth.username", connectionDetails.username)
87+
.set("spark.cassandra.connection.ssl.enabled", connectionDetails.sslEnabled)
88+
.set("spark.cassandra.auth.password", connectionDetails.password)
89+
.set("spark.cassandra.input.consistency.level", consistencyLevel)
90+
.set("spark.cassandra.connection.host", connectionDetails.host)
91+
.set("spark.cassandra.connection.port", connectionDetails.port))
92+
}
93+
}
94+
}

src/main/scala/com/datastax/cdm/job/DiffData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import org.slf4j.LoggerFactory
66

77
import scala.collection.JavaConversions._
88

9-
object DiffData extends AbstractJob {
9+
object DiffData extends BaseJob {
1010

1111
val logger = LoggerFactory.getLogger(this.getClass.getName)
1212
logger.info("Started Data Validation App")

src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import org.slf4j.LoggerFactory
66

77
import scala.collection.JavaConversions._
88

9-
object GuardrailCheck extends AbstractJob {
9+
object GuardrailCheck extends BaseJob {
1010

1111
val logger = LoggerFactory.getLogger(this.getClass.getName)
1212
logger.info("Started Migration App")

src/main/scala/com/datastax/cdm/job/Migrate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import scala.collection.JavaConversions._
88

99
// http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/
1010

11-
object Migrate extends AbstractJob {
11+
object Migrate extends BaseJob {
1212

1313
val logger = LoggerFactory.getLogger(this.getClass.getName)
1414
logger.info("Started Migration App")

src/main/scala/com/datastax/cdm/job/MigratePartitionsFromFile.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory
55

66
import scala.collection.JavaConversions._
77

8-
object MigratePartitionsFromFile extends AbstractJob {
8+
object MigratePartitionsFromFile extends BaseJob {
99

1010
val logger = LoggerFactory.getLogger(this.getClass.getName)
1111
logger.info("Started MigratePartitionsFromFile App")

src/main/scala/com/datastax/cdm/job/MigrateRowsFromFile.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.datastax.cdm.job
33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.slf4j.LoggerFactory
55

6-
object MigrateRowsFromFile extends AbstractJob {
6+
object MigrateRowsFromFile extends BaseJob {
77

88
val logger = LoggerFactory.getLogger(this.getClass.getName)
99
logger.info("Started MigrateRowsFromFile App")

0 commit comments

Comments
 (0)