Skip to content

Commit 6169747

Browse files
authored
Merge pull request #52 from abinaya21/fix-cluster-mode-issue
[ISSUE-51] Fix error in accessing config when running job in cluster mode
2 parents 80ee56f + b60ecb7 commit 6169747

File tree

3 files changed

+10
-8
lines changed

3 files changed

+10
-8
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>3.0.1</version>
6+
<version>3.0.2</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ package datastax.astra.migrate
33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.slf4j.LoggerFactory
55

6+
import org.apache.spark.SparkConf
67
import scala.collection.JavaConversions._
78

89
object DiffData extends AbstractJob {
910

1011
val logger = LoggerFactory.getLogger(this.getClass.getName)
1112
logger.info("Started Data Validation App")
1213

13-
diffTable(sourceConnection, destinationConnection)
14+
diffTable(sourceConnection, destinationConnection, sc)
1415

1516
exitSpark
1617

17-
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
18+
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, config: SparkConf) = {
1819
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
1920
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2021
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
@@ -23,11 +24,11 @@ object DiffData extends AbstractJob {
2324
parts.foreach(part => {
2425
sourceConnection.withSessionDo(sourceSession =>
2526
destinationConnection.withSessionDo(destinationSession =>
26-
DiffJobSession.getInstance(sourceSession, destinationSession, sc)
27+
DiffJobSession.getInstance(sourceSession, destinationSession, config)
2728
.getDataAndDiff(part.getMin, part.getMax)))
2829
})
2930

30-
DiffJobSession.getInstance(null, null, sc).printCounts(true);
31+
DiffJobSession.getInstance(null, null, config).printCounts(true);
3132
}
3233

3334
}

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package datastax.astra.migrate
22

33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.slf4j.LoggerFactory
5+
import org.apache.spark.SparkConf
56

67
import scala.collection.JavaConversions._
78

@@ -12,11 +13,11 @@ object Migrate extends AbstractJob {
1213
val logger = LoggerFactory.getLogger(this.getClass.getName)
1314
logger.info("Started Migration App")
1415

15-
migrateTable(sourceConnection, destinationConnection)
16+
migrateTable(sourceConnection, destinationConnection, sc)
1617

1718
exitSpark
1819

19-
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
20+
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, config: SparkConf) = {
2021
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
2122
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2223
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
@@ -25,7 +26,7 @@ object Migrate extends AbstractJob {
2526
parts.foreach(part => {
2627
sourceConnection.withSessionDo(sourceSession =>
2728
destinationConnection.withSessionDo(destinationSession =>
28-
CopyJobSession.getInstance(sourceSession, destinationSession, sc)
29+
CopyJobSession.getInstance(sourceSession, destinationSession, config)
2930
.getDataAndInsert(part.getMin, part.getMax)))
3031
})
3132

0 commit comments

Comments
 (0)