Skip to content

Commit 36e932b

Browse files
committed
Adding support to migrate from any source (C*/DSE/Astra) to any destination (C*/DSE/Astra).
Refactored scala code to increase reusability Simplified config file
1 parent b02113e commit 36e932b

File tree

7 files changed

+109
-193
lines changed

7 files changed

+109
-193
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.datastax.spark.example</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.12</version>
6+
<version>0.13</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
6969
maxRetries = Integer.parseInt(sparkConf.get("spark.migrate.maxRetries", "10"));
7070

7171
sourceKeyspaceTable = sparkConf.get("spark.migrate.source.keyspaceTable");
72-
astraKeyspaceTable = sparkConf.get("spark.migrate.astra.keyspaceTable");
72+
astraKeyspaceTable = sparkConf.get("spark.migrate.destination.keyspaceTable");
7373

7474
isPreserveTTLWritetime = Boolean.parseBoolean(sparkConf.get("spark.migrate.preserveTTLWriteTime", "false"));
7575
if (isPreserveTTLWritetime) {
@@ -101,7 +101,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
101101

102102
logger.info(" DEFAULT -- Write Batch Size: " + batchSize);
103103
logger.info(" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable);
104-
logger.info(" DEFAULT -- Astra Keyspace Table: " + astraKeyspaceTable);
104+
logger.info(" DEFAULT -- Destination Keyspace Table: " + astraKeyspaceTable);
105105
logger.info(" DEFAULT -- ReadRateLimit: " + readLimiter.getRate());
106106
logger.info(" DEFAULT -- WriteRateLimit: " + writeLimiter.getRate());
107107
logger.info(" DEFAULT -- WriteTimestampFilter: " + writeTimeStampFilter);
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package datastax.astra.migrate
2+
3+
import com.datastax.spark.connector.cql.CassandraConnector
4+
import org.apache.log4j.Logger
5+
import org.apache.spark.sql.SparkSession
6+
7+
import java.math.BigInteger
8+
9+
class AbstractJob extends App {
10+
11+
val abstractLogger = Logger.getLogger(this.getClass.getName)
12+
val spark = SparkSession.builder
13+
.appName("Datastax Data Validation")
14+
.getOrCreate()
15+
16+
val sc = spark.sparkContext
17+
18+
val sourceIsAstra = sc.getConf.get("spark.migrate.source.isAstra", "false")
19+
val sourceScbPath = sc.getConf.get("spark.migrate.source.scb", "")
20+
val sourceHost = sc.getConf.get("spark.migrate.source.host", "")
21+
val sourceUsername = sc.getConf.get("spark.migrate.source.username")
22+
val sourcePassword = sc.getConf.get("spark.migrate.source.password")
23+
val sourceReadConsistencyLevel = sc.getConf.get("spark.migrate.source.read.consistency.level","LOCAL_QUORUM")
24+
25+
val destinationIsAstra = sc.getConf.get("spark.migrate.destination.isAstra", "true")
26+
val destinationScbPath = sc.getConf.get("spark.migrate.destination.scb", "")
27+
val destinationHost = sc.getConf.get("spark.migrate.destination.host", "")
28+
val destinationUsername = sc.getConf.get("spark.migrate.destination.username")
29+
val destinationPassword = sc.getConf.get("spark.migrate.destination.password")
30+
val destinationReadConsistencyLevel = sc.getConf.get("spark.migrate.destination.read.consistency.level", "LOCAL_QUORUM")
31+
32+
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
33+
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
34+
35+
val splitSize = sc.getConf.get("spark.migrate.splitSize","10000")
36+
37+
var sourceConnection = CassandraConnector(sc.getConf
38+
.set("spark.cassandra.connection.host", sourceHost)
39+
.set("spark.cassandra.auth.username", sourceUsername)
40+
.set("spark.cassandra.auth.password", sourcePassword)
41+
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel))
42+
if ("true".equals(sourceIsAstra)) {
43+
sourceConnection = CassandraConnector(sc.getConf
44+
.set("spark.cassandra.connection.config.cloud.path", sourceScbPath)
45+
.set("spark.cassandra.auth.username", sourceUsername)
46+
.set("spark.cassandra.auth.password", sourcePassword)
47+
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel))
48+
abstractLogger.info("Connected to Astra source!");
49+
} else {
50+
abstractLogger.info("Connected to Cassandra (or DSE) source!");
51+
}
52+
53+
var destinationConnection = CassandraConnector(sc.getConf
54+
.set("spark.cassandra.connection.host", destinationHost)
55+
.set("spark.cassandra.auth.username", destinationUsername)
56+
.set("spark.cassandra.auth.password", destinationPassword)
57+
.set("spark.cassandra.input.consistency.level", destinationReadConsistencyLevel))
58+
if ("true".equals(destinationIsAstra)) {
59+
destinationConnection = CassandraConnector(
60+
sc.getConf
61+
.set("spark.cassandra.connection.config.cloud.path", destinationScbPath)
62+
.set("spark.cassandra.auth.username", destinationUsername)
63+
.set("spark.cassandra.auth.password", destinationPassword)
64+
.set("spark.cassandra.input.consistency.level", destinationReadConsistencyLevel))
65+
abstractLogger.info("Connected to Astra destination!");
66+
} else {
67+
abstractLogger.info("Connected to Cassandra (or DSE) destination!");
68+
}
69+
70+
protected def exitSpark = {
71+
spark.stop()
72+
sys.exit(0)
73+
}
74+
75+
}
Lines changed: 8 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,35 @@
11
package datastax.astra.migrate
22

3-
import com.datastax.oss.driver.api.core.{CqlIdentifier, CqlSession}
4-
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata
5-
import com.datastax.spark.connector._
63
import com.datastax.spark.connector.cql.CassandraConnector
7-
import datastax.astra.migrate.Migrate.{astraPassword, astraReadConsistencyLevel, astraScbPath, astraUsername, sc, sourceHost, sourcePassword, sourceReadConsistencyLevel, sourceUsername}
84
import org.apache.log4j.Logger
9-
import org.apache.spark.sql.{SaveMode, SparkSession}
10-
import org.apache.spark.sql.hive._
11-
import org.apache.spark.sql.cassandra._
125

13-
import scala.collection.JavaConversions._
146
import java.lang.Long
157
import java.math.BigInteger
16-
import collection.JavaConversions._
17-
import java.math.BigInteger
8+
import scala.collection.JavaConversions._
189

19-
object DiffData extends App {
10+
object DiffData extends AbstractJob {
2011

2112
val logger = Logger.getLogger(this.getClass.getName)
22-
23-
val spark = SparkSession.builder
24-
.appName("Datastax Data Validation")
25-
.getOrCreate()
26-
27-
import spark.implicits._
28-
29-
val sc = spark.sparkContext
30-
31-
val sourceUsername = sc.getConf.get("spark.migrate.source.username")
32-
val sourcePassword = sc.getConf.get("spark.migrate.source.password")
33-
val sourceHost = sc.getConf.get("spark.migrate.source.host")
34-
val sourceReadConsistencyLevel = sc.getConf.get("spark.cassandra.source.read.consistency.level","LOCAL_QUORUM")
35-
36-
val destinationHost = sc.getConf.get("spark.migrate.destination.host", "")
37-
val destintationUsername = sc.getConf.get("spark.migrate.destination.username", "")
38-
val destinationPassword = sc.getConf.get("spark.migrate.destination.password", "")
39-
val destinationReadConsistencyLevel = sc.getConf.get("spark.migrate.destination.read.consistency.level", "LOCAL_QUORUM")
40-
41-
val astraScbPath = sc.getConf.get("spark.migrate.astra.scb")
42-
val astraUsername = sc.getConf.get("spark.migrate.astra.username")
43-
val astraPassword = sc.getConf.get("spark.migrate.astra.password")
44-
val astraReadConsistencyLevel = sc.getConf.get("spark.cassandra.astra.read.consistency.level","LOCAL_QUORUM")
45-
46-
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
47-
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
48-
49-
val splitSize = sc.getConf.get("spark.migrate.splitSize","10000")
50-
51-
5213
logger.info("Started Data Validation App")
5314

54-
val isBeta = sc.getConf.get("spark.migrate.beta","false")
55-
val isCassandraToCassandra = sc.getConf.get("spark.migrate.ctoc", "false")
56-
57-
var sourceConnection = CassandraConnector(
58-
sc.getConf
59-
.set("spark.cassandra.connection.host", sourceHost)
60-
.set("spark.cassandra.auth.username", sourceUsername)
61-
.set("spark.cassandra.auth.password", sourcePassword)
62-
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel))
63-
64-
if("true".equals(isBeta)){
65-
sourceConnection = CassandraConnector(
66-
sc.getConf
67-
.set("spark.cassandra.connection.config.cloud.path", astraScbPath)
68-
.set("spark.cassandra.auth.username", astraUsername)
69-
.set("spark.cassandra.auth.password", astraPassword)
70-
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel)
71-
)
72-
73-
}
74-
var destinationConnection = CassandraConnector(sc.getConf
75-
.set("spark.cassandra.connection.config.cloud.path", astraScbPath)
76-
.set("spark.cassandra.auth.username", astraUsername)
77-
.set("spark.cassandra.auth.password", astraPassword)
78-
.set("spark.cassandra.input.consistency.level", astraReadConsistencyLevel))
79-
80-
if ("true".equals(isCassandraToCassandra)) {
81-
destinationConnection = CassandraConnector(
82-
sc.getConf
83-
.set("spark.cassandra.connection.host", destinationHost)
84-
.set("spark.cassandra.auth.username", destintationUsername)
85-
.set("spark.cassandra.auth.password", destinationPassword)
86-
.set("spark.cassandra.input.consistency.level", destinationReadConsistencyLevel))
87-
}
88-
89-
diffTable(sourceConnection,destinationConnection, minPartition, maxPartition)
15+
diffTable(sourceConnection, destinationConnection, minPartition, maxPartition)
9016

9117
exitSpark
9218

93-
private def diffTable(sourceConnection: CassandraConnector, astraConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
19+
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
9420
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
9521
val parts = sc.parallelize(partitions.toSeq,partitions.size);
9622

9723
logger.info("Spark parallelize created : " + parts.count() + " parts!");
24+
9825
parts.foreach(part => {
99-
sourceConnection.withSessionDo(sourceSession =>
100-
astraConnection.withSessionDo(astraSession =>
101-
DiffJobSession.getInstance(sourceSession,astraSession, sc.getConf)
26+
sourceConnection.withSessionDo(sourceSession =>
27+
destinationConnection.withSessionDo(destinationSession =>
28+
DiffJobSession.getInstance(sourceSession, destinationSession, sc.getConf)
10229
.getDataAndDiff(part.getMin, part.getMax)))
10330
})
10431

10532
DiffJobSession.getInstance(null, null, sc.getConf).printCounts("Job Final");
10633
}
10734

108-
private def exitSpark = {
109-
spark.stop()
110-
sys.exit(0)
111-
}
112-
11335
}

src/main/scala/datastax/astra/migrate/DiffMetaData.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
package datastax.astra.migrate
22

3-
import com.datastax.oss.driver.api.core.{CqlIdentifier, CqlSession}
4-
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata
5-
import com.datastax.spark.connector._
63
import com.datastax.spark.connector.cql.CassandraConnector
7-
import datastax.astra.migrate.Migrate.{astraPassword, astraReadConsistencyLevel, astraScbPath, astraUsername, sc, sourceHost, sourcePassword, sourceReadConsistencyLevel, sourceUsername}
8-
import org.apache.spark.sql.{SaveMode, SparkSession}
9-
import org.apache.spark.sql.hive._
10-
import org.apache.spark.sql.cassandra._
4+
import org.apache.spark.sql.SparkSession
115

12-
import scala.collection.JavaConversions._
136
import java.lang.Long
147
import java.math.BigInteger
15-
import collection.JavaConversions._
16-
import java.math.BigInteger
8+
import scala.collection.JavaConversions._
179

1810
object DiffMetaData extends App {
1911

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 11 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,36 @@
11
package datastax.astra.migrate
22

3-
import com.datastax.oss.driver.api.core.{CqlIdentifier, CqlSession}
4-
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata
5-
import com.datastax.spark.connector._
63
import com.datastax.spark.connector.cql.CassandraConnector
74
import org.apache.log4j.Logger
8-
import org.apache.spark.sql.{SaveMode, SparkSession}
9-
import org.apache.spark.sql.hive._
10-
import org.apache.spark.sql.cassandra._
115

12-
import scala.collection.JavaConversions._
136
import java.lang.Long
147
import java.math.BigInteger
15-
import collection.JavaConversions._
16-
17-
8+
import scala.collection.JavaConversions._
189

1910
// http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/
2011

21-
object Migrate extends App {
22-
val logger = Logger.getLogger(this.getClass.getName)
23-
24-
val spark = SparkSession.builder
25-
.appName("Datastax Data Migration")
26-
.getOrCreate()
27-
28-
import spark.implicits._
29-
30-
val sc = spark.sparkContext
31-
32-
33-
val sourceUsername = sc.getConf.get("spark.migrate.source.username")
34-
val sourcePassword = sc.getConf.get("spark.migrate.source.password")
35-
val sourceHost = sc.getConf.get("spark.migrate.source.host")
36-
37-
38-
val astraUsername = sc.getConf.get("spark.migrate.astra.username")
39-
val astraPassword = sc.getConf.get("spark.migrate.astra.password")
40-
41-
val astraScbPath = sc.getConf.get("spark.migrate.astra.scb")
42-
43-
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
44-
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
45-
46-
val splitSize = sc.getConf.get("spark.migrate.splitSize","10000")
47-
48-
val sourceReadConsistencyLevel = sc.getConf.get("spark.cassandra.source.read.consistency.level","LOCAL_QUORUM")
49-
val astraReadConsistencyLevel = sc.getConf.get("spark.cassandra.astra.read.consistency.level","LOCAL_QUORUM")
50-
12+
object Migrate extends AbstractJob {
5113

14+
val logger = Logger.getLogger(this.getClass.getName)
5215
logger.info("Started Migration App")
5316

54-
val isBeta = sc.getConf.get("spark.migrate.beta","false")
55-
56-
var sourceConnection = CassandraConnector(
57-
sc.getConf
58-
.set("spark.cassandra.connection.host", sourceHost)
59-
.set("spark.cassandra.auth.username", sourceUsername)
60-
.set("spark.cassandra.auth.password", sourcePassword)
61-
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel))
62-
63-
64-
if("true".equals(isBeta)){
65-
sourceConnection = CassandraConnector(
66-
sc.getConf
67-
.set("spark.cassandra.connection.config.cloud.path", astraScbPath)
68-
.set("spark.cassandra.auth.username", astraUsername)
69-
.set("spark.cassandra.auth.password", astraPassword)
70-
.set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel)
71-
)
72-
73-
}
74-
75-
76-
val astraConnection = CassandraConnector(sc.getConf
77-
.set("spark.cassandra.connection.config.cloud.path", astraScbPath)
78-
.set("spark.cassandra.auth.username", astraUsername)
79-
.set("spark.cassandra.auth.password", astraPassword)
80-
.set("spark.cassandra.input.consistency.level", astraReadConsistencyLevel))
81-
82-
83-
84-
85-
migrateTable(sourceConnection,astraConnection, minPartition, maxPartition)
17+
migrateTable(sourceConnection, destinationConnection, minPartition, maxPartition)
8618

8719
exitSpark
8820

89-
private def migrateTable(sourceConnection: CassandraConnector, astraConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
90-
21+
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
9122
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
9223
val parts = sc.parallelize(partitions.toSeq,partitions.size);
24+
9325
logger.info("Spark parallelize created : " + parts.count() + " parts!");
26+
9427
parts.foreach(part => {
95-
sourceConnection.withSessionDo(sourceSession => astraConnection.withSessionDo(astraSession=> CopyJobSession.getInstance(sourceSession,astraSession, sc.getConf).getDataAndInsert(part.getMin, part.getMax)))
28+
sourceConnection.withSessionDo(sourceSession =>
29+
destinationConnection.withSessionDo(destinationSession =>
30+
CopyJobSession.getInstance(sourceSession,destinationSession, sc.getConf)
31+
.getDataAndInsert(part.getMin, part.getMax)))
9632
})
97-
}
9833

99-
private def exitSpark = {
100-
spark.stop()
101-
sys.exit(0)
10234
}
10335

10436
}

0 commit comments

Comments
 (0)