|
1 | 1 | package datastax.astra.migrate
|
2 | 2 |
|
3 |
| -import com.datastax.oss.driver.api.core.{CqlIdentifier, CqlSession} |
4 |
| -import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata |
5 |
| -import com.datastax.spark.connector._ |
6 | 3 | import com.datastax.spark.connector.cql.CassandraConnector
|
7 |
| -import datastax.astra.migrate.Migrate.{astraPassword, astraReadConsistencyLevel, astraScbPath, astraUsername, sc, sourceHost, sourcePassword, sourceReadConsistencyLevel, sourceUsername} |
8 | 4 | import org.apache.log4j.Logger
|
9 |
| -import org.apache.spark.sql.{SaveMode, SparkSession} |
10 |
| -import org.apache.spark.sql.hive._ |
11 |
| -import org.apache.spark.sql.cassandra._ |
12 | 5 |
|
13 |
| -import scala.collection.JavaConversions._ |
14 | 6 | import java.lang.Long
|
15 | 7 | import java.math.BigInteger
|
16 |
| -import collection.JavaConversions._ |
17 |
| -import java.math.BigInteger |
| 8 | +import scala.collection.JavaConversions._ |
18 | 9 |
|
19 |
| -object DiffData extends App { |
| 10 | +object DiffData extends AbstractJob { |
20 | 11 |
|
21 | 12 | val logger = Logger.getLogger(this.getClass.getName)
|
22 |
| - |
23 |
| - val spark = SparkSession.builder |
24 |
| - .appName("Datastax Data Validation") |
25 |
| - .getOrCreate() |
26 |
| - |
27 |
| - import spark.implicits._ |
28 |
| - |
29 |
| - val sc = spark.sparkContext |
30 |
| - |
31 |
| - val sourceUsername = sc.getConf.get("spark.migrate.source.username") |
32 |
| - val sourcePassword = sc.getConf.get("spark.migrate.source.password") |
33 |
| - val sourceHost = sc.getConf.get("spark.migrate.source.host") |
34 |
| - val sourceReadConsistencyLevel = sc.getConf.get("spark.cassandra.source.read.consistency.level","LOCAL_QUORUM") |
35 |
| - |
36 |
| - val destinationHost = sc.getConf.get("spark.migrate.destination.host", "") |
37 |
| - val destintationUsername = sc.getConf.get("spark.migrate.destination.username", "") |
38 |
| - val destinationPassword = sc.getConf.get("spark.migrate.destination.password", "") |
39 |
| - val destinationReadConsistencyLevel = sc.getConf.get("spark.migrate.destination.read.consistency.level", "LOCAL_QUORUM") |
40 |
| - |
41 |
| - val astraScbPath = sc.getConf.get("spark.migrate.astra.scb") |
42 |
| - val astraUsername = sc.getConf.get("spark.migrate.astra.username") |
43 |
| - val astraPassword = sc.getConf.get("spark.migrate.astra.password") |
44 |
| - val astraReadConsistencyLevel = sc.getConf.get("spark.cassandra.astra.read.consistency.level","LOCAL_QUORUM") |
45 |
| - |
46 |
| - val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition")) |
47 |
| - val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition")) |
48 |
| - |
49 |
| - val splitSize = sc.getConf.get("spark.migrate.splitSize","10000") |
50 |
| - |
51 |
| - |
52 | 13 | logger.info("Started Data Validation App")
|
53 | 14 |
|
54 |
| - val isBeta = sc.getConf.get("spark.migrate.beta","false") |
55 |
| - val isCassandraToCassandra = sc.getConf.get("spark.migrate.ctoc", "false") |
56 |
| - |
57 |
| - var sourceConnection = CassandraConnector( |
58 |
| - sc.getConf |
59 |
| - .set("spark.cassandra.connection.host", sourceHost) |
60 |
| - .set("spark.cassandra.auth.username", sourceUsername) |
61 |
| - .set("spark.cassandra.auth.password", sourcePassword) |
62 |
| - .set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel)) |
63 |
| - |
64 |
| - if("true".equals(isBeta)){ |
65 |
| - sourceConnection = CassandraConnector( |
66 |
| - sc.getConf |
67 |
| - .set("spark.cassandra.connection.config.cloud.path", astraScbPath) |
68 |
| - .set("spark.cassandra.auth.username", astraUsername) |
69 |
| - .set("spark.cassandra.auth.password", astraPassword) |
70 |
| - .set("spark.cassandra.input.consistency.level", sourceReadConsistencyLevel) |
71 |
| - ) |
72 |
| - |
73 |
| - } |
74 |
| - var destinationConnection = CassandraConnector(sc.getConf |
75 |
| - .set("spark.cassandra.connection.config.cloud.path", astraScbPath) |
76 |
| - .set("spark.cassandra.auth.username", astraUsername) |
77 |
| - .set("spark.cassandra.auth.password", astraPassword) |
78 |
| - .set("spark.cassandra.input.consistency.level", astraReadConsistencyLevel)) |
79 |
| - |
80 |
| - if ("true".equals(isCassandraToCassandra)) { |
81 |
| - destinationConnection = CassandraConnector( |
82 |
| - sc.getConf |
83 |
| - .set("spark.cassandra.connection.host", destinationHost) |
84 |
| - .set("spark.cassandra.auth.username", destintationUsername) |
85 |
| - .set("spark.cassandra.auth.password", destinationPassword) |
86 |
| - .set("spark.cassandra.input.consistency.level", destinationReadConsistencyLevel)) |
87 |
| - } |
88 |
| - |
89 |
| - diffTable(sourceConnection,destinationConnection, minPartition, maxPartition) |
| 15 | + diffTable(sourceConnection, destinationConnection, minPartition, maxPartition) |
90 | 16 |
|
91 | 17 | exitSpark
|
92 | 18 |
|
93 |
| - private def diffTable(sourceConnection: CassandraConnector, astraConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = { |
| 19 | + private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = { |
94 | 20 | val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
|
95 | 21 | val parts = sc.parallelize(partitions.toSeq,partitions.size);
|
96 | 22 |
|
97 | 23 | logger.info("Spark parallelize created : " + parts.count() + " parts!");
|
| 24 | + |
98 | 25 | parts.foreach(part => {
|
99 |
| - sourceConnection.withSessionDo(sourceSession => |
100 |
| - astraConnection.withSessionDo(astraSession => |
101 |
| - DiffJobSession.getInstance(sourceSession,astraSession, sc.getConf) |
| 26 | + sourceConnection.withSessionDo(sourceSession => |
| 27 | + destinationConnection.withSessionDo(destinationSession => |
| 28 | + DiffJobSession.getInstance(sourceSession, destinationSession, sc.getConf) |
102 | 29 | .getDataAndDiff(part.getMin, part.getMax)))
|
103 | 30 | })
|
104 | 31 |
|
105 | 32 | DiffJobSession.getInstance(null, null, sc.getConf).printCounts("Job Final");
|
106 | 33 | }
|
107 | 34 |
|
108 |
| - private def exitSpark = { |
109 |
| - spark.stop() |
110 |
| - sys.exit(0) |
111 |
| - } |
112 |
| - |
113 | 35 | }
|
0 commit comments