Skip to content

Commit 8ff34f8

Browse files
authored
Merge pull request #85 from datastax/feature/cosmos-compatibility
Feature/cosmos compatibility
2 parents 8d73c1f + 5daf710 commit 8ff34f8

File tree

4 files changed

+34
-14
lines changed

4 files changed

+34
-14
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>3.0.5</version>
6+
<version>3.1.0</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ class AbstractJob extends BaseJob {
99
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
1010
abstractLogger.info("PARAM -- Split Size: " + splitSize)
1111
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
12+
abstractLogger.info("PARAM -- Origin SSL Enabled: {}", sourceSSLEnabled);
13+
abstractLogger.info("PARAM -- Target SSL Enabled: {}", destinationSSLEnabled);
1214

13-
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
15+
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourcePort, sourceUsername, sourcePassword, sourceSSLEnabled,
1416
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
1517

16-
var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword,
18+
var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationPort, destinationUsername, destinationPassword, destinationSSLEnabled,
1719
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
1820

19-
private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String,
20-
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
21+
private def getConnection(isSource: Boolean, scbPath: String, host: String, port: String, username: String, password: String,
22+
sslEnabled: String, trustStorePath: String, trustStorePassword: String, trustStoreType: String,
2123
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
2224
var connType: String = "Source"
2325
if (!isSource) {
@@ -34,7 +36,7 @@ class AbstractJob extends BaseJob {
3436
.set("spark.cassandra.input.consistency.level", consistencyLevel)
3537
.set("spark.cassandra.connection.config.cloud.path", scbPath))
3638
} else if (trustStorePath.nonEmpty) {
37-
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) with SSL host: " + host);
39+
abstractLogger.info(connType + ": Connecting (with clientAuth) to Cassandra (or DSE) host:port " + host + ":" + port);
3840

3941
// Use defaults when not provided
4042
var enabledAlgorithmsVar = enabledAlgorithms
@@ -47,6 +49,7 @@ class AbstractJob extends BaseJob {
4749
.set("spark.cassandra.auth.password", password)
4850
.set("spark.cassandra.input.consistency.level", consistencyLevel)
4951
.set("spark.cassandra.connection.host", host)
52+
.set("spark.cassandra.connection.port", port)
5053
.set("spark.cassandra.connection.ssl.enabled", "true")
5154
.set("spark.cassandra.connection.ssl.enabledAlgorithms", enabledAlgorithmsVar)
5255
.set("spark.cassandra.connection.ssl.trustStore.password", trustStorePassword)
@@ -57,12 +60,14 @@ class AbstractJob extends BaseJob {
5760
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
5861
)
5962
} else {
60-
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) host: " + host);
63+
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) host:port " + host + ":" + port);
6164

6265
return CassandraConnector(config.set("spark.cassandra.auth.username", username)
66+
.set("spark.cassandra.connection.ssl.enabled", sslEnabled)
6367
.set("spark.cassandra.auth.password", password)
6468
.set("spark.cassandra.input.consistency.level", consistencyLevel)
65-
.set("spark.cassandra.connection.host", host))
69+
.set("spark.cassandra.connection.host", host)
70+
.set("spark.cassandra.connection.port", port))
6671
}
6772

6873
}

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ class BaseJob extends App {
2222

2323
val sourceScbPath = Util.getSparkPropOrEmpty(sc, "spark.origin.scb")
2424
val sourceHost = Util.getSparkPropOrEmpty(sc, "spark.origin.host")
25+
val sourcePort = Util.getSparkPropOr(sc, "spark.origin.port", "9042")
2526
val sourceUsername = Util.getSparkPropOrEmpty(sc, "spark.origin.username")
2627
val sourcePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.password")
28+
val sourceSSLEnabled = Util.getSparkPropOr(sc, "spark.origin.ssl.enabled", "false")
2729
val sourceTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.path")
2830
val sourceTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.password")
2931
val sourceTrustStoreType = Util.getSparkPropOr(sc, "spark.origin.trustStore.type", "JKS")
@@ -33,8 +35,10 @@ class BaseJob extends App {
3335

3436
val destinationScbPath = Util.getSparkPropOrEmpty(sc, "spark.target.scb")
3537
val destinationHost = Util.getSparkPropOrEmpty(sc, "spark.target.host")
38+
val destinationPort = Util.getSparkPropOr(sc, "spark.target.port", "9042")
3639
val destinationUsername = Util.getSparkProp(sc, "spark.target.username")
3740
val destinationPassword = Util.getSparkProp(sc, "spark.target.password")
41+
val destinationSSLEnabled = Util.getSparkPropOr(sc, "spark.target.ssl.enabled", "false")
3842
val destinationTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.path")
3943
val destinationTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.password")
4044
val destinationTrustStoreType = Util.getSparkPropOr(sc, "spark.target.trustStore.type", "JKS")

src/resources/sparkConf.properties

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
# Origin cluster credentials
1+
# Origin cluster credentials (use "host + port" OR "secure-connect-bundle" but not both)
22
spark.origin.host localhost
3+
spark.origin.port 9042
4+
#spark.origin.scb file:///aaa/bbb/secure-connect-enterprise.zip
35
spark.origin.username some-username
46
spark.origin.password some-secret-password
57
spark.origin.keyspaceTable test.a1
68

7-
# Target cluster credentials
9+
# Target cluster credentials (use "host + port" OR "secure-connect-bundle" but not both)
10+
#spark.target.host localhost
11+
#spark.target.port 9042
812
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip
913
spark.target.username client-id
1014
spark.target.password client-secret
@@ -29,12 +33,13 @@ spark.splitSize 10000
2933
spark.batchSize 10
3034

3135
# Below 'query' properties are set based on table schema
32-
spark.query.origin partition-key,clustering-key,order-date,amount
33-
spark.query.origin.partitionKey partition-key
34-
spark.query.target.id partition-key,clustering-key
36+
spark.query.origin comma-separated-partition-key,comma-separated-clustering-key,comma-separated-other-columns
37+
spark.query.origin.partitionKey comma-separated-partition-key
38+
spark.query.target.id comma-separated-partition-key,comma-separated-clustering-key
39+
# Comma separated numeric data-type mapping (e.g. 'text' will map to '0') for all columns listed in "spark.query.origin"
3540
spark.query.types 9,1,4,3
3641
#############################################################################################################
37-
# Following are the supported data types and their corresponding [Cassandra data-types]
42+
# Following are the supported data types and their corresponding [Cassandra data-types] mapping
3843
# 0: ascii, text, varchar
3944
# 1: int
4045
# 2: bigint, counter
@@ -114,6 +119,9 @@ spark.query.writetime.cols 2,3
114119
#spark.origin.FilterColumnType 6%16
115120
#spark.origin.FilterColumnValue test
116121

122+
# ONLY USE if SSL is enabled on origin Cassandra/DSE
123+
#spark.origin.ssl.enabled true
124+
117125
# ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE
118126
#spark.origin.trustStore.path
119127
#spark.origin.trustStore.password
@@ -122,6 +130,9 @@ spark.query.writetime.cols 2,3
122130
#spark.origin.keyStore.password
123131
#spark.origin.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
124132

133+
# ONLY USE if SSL is enabled on target Cassandra/DSE
134+
#spark.target.ssl.enabled true
135+
125136
# ONLY USE if SSL clientAuth is enabled on target Cassandra/DSE
126137
#spark.target.trustStore.path
127138
#spark.target.trustStore.password

0 commit comments

Comments
 (0)