Skip to content

Commit ebf13c3

Browse files
committed
Implemented percentage based random Migrate & Validation (DiffData) - This will help test migration speed & validation accuracy using a small dataset.
1 parent e748c62 commit ebf13c3

File tree

9 files changed

+28
-127
lines changed

9 files changed

+28
-127
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22
<modelVersion>4.0.0</modelVersion>
33

4-
<groupId>com.datastax.spark.example</groupId>
4+
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.17</version>
6+
<version>0.18</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/NoSparkMigrate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static void main(String[] args) throws IOException {
2222
String splitSize = System.getProperty("spark.migrate.splitSize","10000");
2323
BigInteger minPartition = new BigInteger(System.getProperty("spark.migrate.source.minPartition"));
2424
BigInteger maxPartition = new BigInteger(System.getProperty("spark.migrate.source.maxPartition"));
25-
Collection<SplitPartitions.Partition> partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition);
25+
Collection<SplitPartitions.Partition> partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition, 100);
2626

2727
/*
2828
partitions.parallelStream().forEach( part ->

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,28 @@ public class SplitPartitions {
1515
public final static Long MIN_PARTITION = Long.MIN_VALUE;
1616
public final static Long MAX_PARTITION = Long.MAX_VALUE;
1717

18-
public static final BigInteger MIN_RANDOM = new BigInteger("-1");
19-
public static final BigInteger MAX_RANDOM = (new BigInteger("2")).pow(127);
20-
21-
2218
public static void main(String[] args){
23-
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), MIN_RANDOM, MAX_RANDOM);
19+
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), BigInteger.valueOf(MIN_PARTITION),
20+
BigInteger.valueOf(MAX_PARTITION), 20);
2421
for(Partition partition: partitions){
2522
System.out.println(partition);
2623
}
2724
}
28-
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
2925

26+
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent){
3027
logger.info("TreadID: " + Thread.currentThread().getId() + " Splitting min: " + min + " max:" + max);
31-
List<Partition> partitions = getSubPartitions(splitSize,min,max);
28+
List<Partition> partitions = getSubPartitions(splitSize,min,max, coveragePercent);
3229
Collections.shuffle(partitions);
3330
Collections.shuffle(partitions);
3431
Collections.shuffle(partitions);
3532
Collections.shuffle(partitions);
3633
return partitions;
3734
}
38-
// private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
39-
// long curMax = min.longValueExact();
40-
// long partitionSize = max.subtract(min).divide(splitSize).longValueExact();
41-
// List<Partition> partitions = new ArrayList<Partition>();
42-
// if(partitionSize==0){
43-
// partitionSize=100000;
44-
// }
45-
// boolean exausted = false;
46-
// while(curMax<=max.longValueExact()){
47-
// long curMin = curMax;
48-
// long newCurMax = curMin + partitionSize;
49-
// if (newCurMax < curMax) {
50-
// newCurMax = max.longValueExact();
51-
// exausted = true;
52-
// }
53-
// if(newCurMax > max.longValueExact()){
54-
// newCurMax=max.longValueExact();
55-
// exausted=true;
56-
// }
57-
// curMax = newCurMax;
58-
// partitions.add(new Partition(curMin,curMax));
59-
// if(exausted){
60-
// break;
61-
// }
62-
// }
63-
//
64-
// return partitions;
65-
// }
66-
6735

68-
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
36+
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent){
37+
if (coveragePercent < 1 || coveragePercent > 100) {
38+
coveragePercent = 100;
39+
}
6940
BigInteger curMax = new BigInteger(min.toString());
7041
BigInteger partitionSize = max.subtract(min).divide(splitSize);
7142
List<Partition> partitions = new ArrayList<Partition>();
@@ -85,7 +56,10 @@ private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger
8556
exausted=true;
8657
}
8758
curMax = newCurMax;
88-
partitions.add(new Partition(curMin,curMax));
59+
60+
BigInteger range = curMax.subtract(curMin);
61+
BigInteger curRange = range.multiply(BigInteger.valueOf(coveragePercent)).divide(BigInteger.valueOf(100));
62+
partitions.add(new Partition(curMin,curMin.add(curRange)));
8963
if(exausted){
9064
break;
9165
}
@@ -94,22 +68,17 @@ private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger
9468
return partitions;
9569
}
9670

97-
98-
9971
public static class Partition implements Serializable{
10072
private static final long serialVersionUID = 1L;
10173

10274
private BigInteger min;
10375
private BigInteger max;
10476

105-
10677
public Partition(BigInteger min, BigInteger max){
10778
this.min = min;
10879
this.max = max;
10980
}
11081

111-
112-
11382
public BigInteger getMin() {
11483
return min;
11584
}
@@ -119,9 +88,7 @@ public BigInteger getMax() {
11988
}
12089

12190
public String toString(){
122-
// return "--conf spark.migrate.source.minPartition="+ min + " --conf spark.migrate.source.maxPartition=" + max;
123-
124-
return "select * from field_api.field_users where token(account_id,field_id)>="+ min + " and token(account_id,field_id)<=" + max + " and account_id=ee8556f4-9a1a-4c89-ae05-e8105d42ed6f allow filtering; ";
91+
return "Processing partition for token range "+ min + " to " + max;
12592
}
12693
}
12794
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.log4j.Logger
55
import org.apache.spark.sql.SparkSession
66

77
import java.math.BigInteger
8+
import java.lang.Long
89

910
class AbstractJob extends App {
1011

@@ -43,8 +44,9 @@ class AbstractJob extends App {
4344

4445
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
4546
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
46-
47+
val coveragePercent = sc.getConf.get("spark.migrate.coveragePercent", "100")
4748
val splitSize = sc.getConf.get("spark.migrate.splitSize", "10000")
49+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
4850

4951
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
5052
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,19 @@ package datastax.astra.migrate
33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.apache.log4j.Logger
55

6-
import java.lang.Long
7-
import java.math.BigInteger
86
import scala.collection.JavaConversions._
97

108
object DiffData extends AbstractJob {
119

1210
val logger = Logger.getLogger(this.getClass.getName)
1311
logger.info("Started Data Validation App")
1412

15-
diffTable(sourceConnection, destinationConnection, minPartition, maxPartition)
13+
diffTable(sourceConnection, destinationConnection)
1614

1715
exitSpark
1816

19-
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
20-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
17+
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
2118
val parts = sc.parallelize(partitions.toSeq,partitions.size);
22-
2319
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2420

2521
parts.foreach(part => {

src/main/scala/datastax/astra/migrate/DiffMetaData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ object DiffMetaData extends App {
7676

7777
private def diffTable(sourceConnection: CassandraConnector, astraConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
7878

79-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
79+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition, 100)
8080
val parts = sc.parallelize(partitions.toSeq,partitions.size);
8181
parts.foreach(part => {
8282
sourceConnection.withSessionDo(sourceSession => astraConnection.withSessionDo(astraSession=>DiffMetaJobSession.getInstance(sourceSession,astraSession, sc.getConf).getDataDiffAndCorrect(part.getMin, part.getMax)))

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package datastax.astra.migrate
33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.apache.log4j.Logger
55

6-
import java.lang.Long
7-
import java.math.BigInteger
86
import scala.collection.JavaConversions._
97

108
// http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/
@@ -14,14 +12,12 @@ object Migrate extends AbstractJob {
1412
val logger = Logger.getLogger(this.getClass.getName)
1513
logger.info("Started Migration App")
1614

17-
migrateTable(sourceConnection, destinationConnection, minPartition, maxPartition)
15+
migrateTable(sourceConnection, destinationConnection)
1816

1917
exitSpark
2018

21-
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
22-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
19+
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
2320
val parts = sc.parallelize(partitions.toSeq,partitions.size);
24-
2521
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2622

2723
parts.foreach(part => {

src/resources/diff_data.sh

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/resources/sparkConf.properties

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ spark.migrate.destination.autocorrect.missing false
1515
spark.migrate.destination.autocorrect.mismatch false
1616

1717
spark.migrate.maxRetries 10
18-
spark.migrate.readRateLimit 40000
19-
spark.migrate.writeRateLimit 40000
20-
spark.migrate.splitSize 5
18+
spark.migrate.readRateLimit 20000
19+
spark.migrate.writeRateLimit 20000
20+
spark.migrate.splitSize 10000
2121
spark.migrate.batchSize 5
22+
spark.migrate.coveragePercent 100
2223
spark.migrate.printStatsAfter 100000
2324

2425
spark.migrate.query.cols.select partition-key,clustering-key,order-date,amount,writetime(order-date),writetime(amount),ttl(order-date),ttl(amount)

0 commit comments

Comments
 (0)