Skip to content

Commit cda595b

Browse files
authored
Merge pull request #23 from datastax/feature/udt-diff-fix
DiffData UDT fix
2 parents 557a9eb + a24a700 commit cda595b

File tree

5 files changed

+41
-26
lines changed

5 files changed

+41
-26
lines changed

README.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ Spark jobs in this repo can be used for data migration and data validation.
44

55
> :warning: Please note this job has been tested with spark version [2.4.8](https://archive.apache.org/dist/spark/spark-2.4.8/)
66
7+
## Build
8+
1. Clone this repo
9+
2. Move to the repo folder `cd cassandra-data-migrator`
10+
3. Run the build `mvn clean package`
11+
4. The fat jar (`cassandra-data-migrator-2.x.jar`) file should now be present in the `target` folder
12+
713
## Prerequisite
814

915
Install Java8 as spark binaries are compiled with it.
@@ -19,13 +25,12 @@ tar -xvzf <spark downloaded file name>
1925
1. `sparkConf.properties` file needs to be configured as applicable for the environment
2026
> A sample Spark conf file configuration can be [found here](./src/resources/sparkConf.properties)
2127
2. Place the conf file where it can be accessed while running the job via spark-submit.
22-
3. Generate a fat jar (`cassandra-data-migrator-1.x.jar`) using command `mvn clean package`
23-
4. Run the 'Data Migration' job using `spark-submit` command as shown below:
28+
3. Run the 'Data Migration' job using `spark-submit` command as shown below:
2429

2530
```
2631
./spark-submit --properties-file sparkConf.properties /
2732
--master "local[*]" /
28-
--class datastax.astra.migrate.Migrate cassandra-data-migrator-1.x.jar &> logfile_name.txt
33+
--class datastax.astra.migrate.Migrate cassandra-data-migrator-2.x.jar &> logfile_name.txt
2934
```
3035

3136
Note: Above command also generates a log file `logfile_name.txt` to avoid log output on the console.
@@ -38,7 +43,7 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
3843
```
3944
./spark-submit --properties-file sparkConf.properties /
4045
--master "local[*]" /
41-
--class datastax.astra.migrate.DiffData cassandra-data-migrator-1.x.jar &> logfile_name.txt
46+
--class datastax.astra.migrate.DiffData cassandra-data-migrator-2.x.jar &> logfile_name.txt
4247
```
4348

4449
- Validation job will report differences as “ERRORS” in the log file as shown below
@@ -67,7 +72,7 @@ spark.target.autocorrect.mismatch true|false
6772
```
6873
./spark-submit --properties-file sparkConf.properties /
6974
--master "local[*]" /
70-
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-1.x.jar &> logfile_name.txt
75+
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-2.x.jar &> logfile_name.txt
7176
```
7277

7378
When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder in the below format, where each line (`min,max`) represents a partition-range

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.3</version>
6+
<version>2.4</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
66
import com.datastax.oss.driver.api.core.cql.ResultSet;
77
import com.datastax.oss.driver.api.core.cql.Row;
8+
import com.datastax.oss.driver.api.core.data.UdtValue;
89
import org.apache.spark.SparkConf;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
@@ -172,7 +173,15 @@ private String isDifferent(Row sourceRow, Row astraRow) {
172173

173174
boolean isDiff = dataType.diff(source, astra);
174175
if (isDiff) {
175-
diffData.append("(Index: " + index + " Origin: " + source + " Target: " + astra + " ) ");
176+
if (dataType.typeClass.equals(UdtValue.class)) {
177+
String sourceUdtContent = ((UdtValue) source).getFormattedContents();
178+
String astraUdtContent = ((UdtValue) astra).getFormattedContents();
179+
if (!sourceUdtContent.equals(astraUdtContent)) {
180+
diffData.append("(Index: " + index + " Origin: " + sourceUdtContent + " Target: " + astraUdtContent + ") ");
181+
}
182+
} else {
183+
diffData.append("(Index: " + index + " Origin: " + source + " Target: " + astra + ") ");
184+
}
176185
}
177186
});
178187

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import java.util.concurrent.CompletionStage;
1717
import java.util.concurrent.atomic.AtomicLong;
1818

19-
public class OriginCountJobSession extends BaseJobSession{
20-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
19+
public class OriginCountJobSession extends BaseJobSession {
2120
private static OriginCountJobSession originCountJobSession;
21+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2222
protected AtomicLong readCounter = new AtomicLong(0);
2323
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
2424
protected Boolean checkTableforColSize;
@@ -28,17 +28,6 @@ public class OriginCountJobSession extends BaseJobSession{
2828
protected Integer filterColIndex;
2929
protected Integer fieldGuardraillimitMB;
3030
protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
31-
public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkConf sparkConf) {
32-
if (originCountJobSession == null) {
33-
synchronized (OriginCountJobSession.class) {
34-
if (originCountJobSession == null) {
35-
originCountJobSession = new OriginCountJobSession(sourceSession, sparkConf);
36-
}
37-
}
38-
}
39-
40-
return originCountJobSession;
41-
}
4231

4332
protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
4433
this.sourceSession = sourceSession;
@@ -59,8 +48,8 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
5948
checkTableforColSizeTypes = getTypes(sparkConf.get("spark.origin.checkTableforColSize.cols.types"));
6049
filterColName = sparkConf.get("spark.origin.FilterColumn");
6150
filterColType = sparkConf.get("spark.origin.FilterColumnType");
62-
filterColIndex = Integer.parseInt(sparkConf.get("spark.origin.FilterColumnIndex", "0"));
63-
fieldGuardraillimitMB = Integer.parseInt(sparkConf.get("spark.fieldGuardraillimitMB", "0"));
51+
filterColIndex = Integer.parseInt(sparkConf.get("spark.origin.FilterColumnIndex", "0"));
52+
fieldGuardraillimitMB = Integer.parseInt(sparkConf.get("spark.fieldGuardraillimitMB", "0"));
6453

6554
String partionKey = sparkConf.get("spark.query.cols.partitionKey");
6655
idColTypes = getTypes(sparkConf.get("spark.query.cols.id.types"));
@@ -77,6 +66,18 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
7766

7867
}
7968

69+
public static OriginCountJobSession getInstance(CqlSession sourceSession, SparkConf sparkConf) {
70+
if (originCountJobSession == null) {
71+
synchronized (OriginCountJobSession.class) {
72+
if (originCountJobSession == null) {
73+
originCountJobSession = new OriginCountJobSession(sourceSession, sparkConf);
74+
}
75+
}
76+
}
77+
78+
return originCountJobSession;
79+
}
80+
8081
public void getData(BigInteger min, BigInteger max) {
8182
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
8283
int maxAttempts = maxRetries;
@@ -93,7 +94,7 @@ public void getData(BigInteger min, BigInteger max) {
9394
for (Row sourceRow : resultSet) {
9495
readLimiter.acquire(1);
9596

96-
if(checkTableforColSize) {
97+
if (checkTableforColSize) {
9798
int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
9899
String result = "";
99100
if (rowColcnt > fieldGuardraillimitMB * 1048576) {
@@ -115,7 +116,7 @@ public void getData(BigInteger min, BigInteger max) {
115116
readLimiter.acquire(1);
116117
writeLimiter.acquire(1);
117118

118-
if(checkTableforColSize) {
119+
if (checkTableforColSize) {
119120
int rowColcnt = GetRowColumnLength(sourceRow, filterColType, filterColIndex);
120121
String result = "";
121122
if (rowColcnt > fieldGuardraillimitMB * 1048576) {

src/main/scala/datastax/astra/migrate/OriginData.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory
55

66
import scala.collection.JavaConversions._
77

8-
object OriginData extends BaseJob {
8+
object OriginData extends BaseJob {
99

1010
val logger = LoggerFactory.getLogger(this.getClass.getName)
1111
logger.info("Started Migration App")
@@ -65,7 +65,7 @@ object OriginData extends BaseJob {
6565
private def analyzeSourceTable(sourceConnection: CassandraConnector) = {
6666
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
6767
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
68-
val parts = sContext.parallelize(partitions.toSeq,partitions.size);
68+
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
6969
logger.info("Spark parallelize created : " + parts.count() + " parts!");
7070

7171
parts.foreach(part => {

0 commit comments

Comments
 (0)