Skip to content

Commit b4c9bac

Browse files
authored
Merge pull request #25 from Ankitp1342/feature/percent_random
Feature/percent random
2 parents 0dc3824 + ebf13c3 commit b4c9bac

File tree

9 files changed

+43
-131
lines changed

9 files changed

+43
-131
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22
<modelVersion>4.0.0</modelVersion>
33

4-
<groupId>com.datastax.spark.example</groupId>
4+
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.17</version>
6+
<version>0.18</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/NoSparkMigrate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static void main(String[] args) throws IOException {
2222
String splitSize = System.getProperty("spark.migrate.splitSize","10000");
2323
BigInteger minPartition = new BigInteger(System.getProperty("spark.migrate.source.minPartition"));
2424
BigInteger maxPartition = new BigInteger(System.getProperty("spark.migrate.source.maxPartition"));
25-
Collection<SplitPartitions.Partition> partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition);
25+
Collection<SplitPartitions.Partition> partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition, 100);
2626

2727
/*
2828
partitions.parallelStream().forEach( part ->

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,28 @@ public class SplitPartitions {
1515
public final static Long MIN_PARTITION = Long.MIN_VALUE;
1616
public final static Long MAX_PARTITION = Long.MAX_VALUE;
1717

18-
public static final BigInteger MIN_RANDOM = new BigInteger("-1");
19-
public static final BigInteger MAX_RANDOM = (new BigInteger("2")).pow(127);
20-
21-
2218
public static void main(String[] args){
23-
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), MIN_RANDOM, MAX_RANDOM);
19+
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), BigInteger.valueOf(MIN_PARTITION),
20+
BigInteger.valueOf(MAX_PARTITION), 20);
2421
for(Partition partition: partitions){
2522
System.out.println(partition);
2623
}
2724
}
28-
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
2925

26+
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent){
3027
logger.info("TreadID: " + Thread.currentThread().getId() + " Splitting min: " + min + " max:" + max);
31-
List<Partition> partitions = getSubPartitions(splitSize,min,max);
28+
List<Partition> partitions = getSubPartitions(splitSize,min,max, coveragePercent);
3229
Collections.shuffle(partitions);
3330
Collections.shuffle(partitions);
3431
Collections.shuffle(partitions);
3532
Collections.shuffle(partitions);
3633
return partitions;
3734
}
38-
// private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
39-
// long curMax = min.longValueExact();
40-
// long partitionSize = max.subtract(min).divide(splitSize).longValueExact();
41-
// List<Partition> partitions = new ArrayList<Partition>();
42-
// if(partitionSize==0){
43-
// partitionSize=100000;
44-
// }
45-
// boolean exausted = false;
46-
// while(curMax<=max.longValueExact()){
47-
// long curMin = curMax;
48-
// long newCurMax = curMin + partitionSize;
49-
// if (newCurMax < curMax) {
50-
// newCurMax = max.longValueExact();
51-
// exausted = true;
52-
// }
53-
// if(newCurMax > max.longValueExact()){
54-
// newCurMax=max.longValueExact();
55-
// exausted=true;
56-
// }
57-
// curMax = newCurMax;
58-
// partitions.add(new Partition(curMin,curMax));
59-
// if(exausted){
60-
// break;
61-
// }
62-
// }
63-
//
64-
// return partitions;
65-
// }
66-
6735

68-
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
36+
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent){
37+
if (coveragePercent < 1 || coveragePercent > 100) {
38+
coveragePercent = 100;
39+
}
6940
BigInteger curMax = new BigInteger(min.toString());
7041
BigInteger partitionSize = max.subtract(min).divide(splitSize);
7142
List<Partition> partitions = new ArrayList<Partition>();
@@ -85,7 +56,10 @@ private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger
8556
exausted=true;
8657
}
8758
curMax = newCurMax;
88-
partitions.add(new Partition(curMin,curMax));
59+
60+
BigInteger range = curMax.subtract(curMin);
61+
BigInteger curRange = range.multiply(BigInteger.valueOf(coveragePercent)).divide(BigInteger.valueOf(100));
62+
partitions.add(new Partition(curMin,curMin.add(curRange)));
8963
if(exausted){
9064
break;
9165
}
@@ -94,22 +68,17 @@ private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger
9468
return partitions;
9569
}
9670

97-
98-
9971
public static class Partition implements Serializable{
10072
private static final long serialVersionUID = 1L;
10173

10274
private BigInteger min;
10375
private BigInteger max;
10476

105-
10677
public Partition(BigInteger min, BigInteger max){
10778
this.min = min;
10879
this.max = max;
10980
}
11081

111-
112-
11382
public BigInteger getMin() {
11483
return min;
11584
}
@@ -119,9 +88,7 @@ public BigInteger getMax() {
11988
}
12089

12190
public String toString(){
122-
// return "--conf spark.migrate.source.minPartition="+ min + " --conf spark.migrate.source.maxPartition=" + max;
123-
124-
return "select * from field_api.field_users where token(account_id,field_id)>="+ min + " and token(account_id,field_id)<=" + max + " and account_id=ee8556f4-9a1a-4c89-ae05-e8105d42ed6f allow filtering; ";
91+
return "Processing partition for token range "+ min + " to " + max;
12592
}
12693
}
12794
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.log4j.Logger
55
import org.apache.spark.sql.SparkSession
66

77
import java.math.BigInteger
8+
import java.lang.Long
89

910
class AbstractJob extends App {
1011

@@ -26,6 +27,7 @@ class AbstractJob extends App {
2627
val sourceTrustStoreType = sc.getConf.get("spark.migrate.source.trustStore.type", "JKS")
2728
val sourceKeyStorePath = sc.getConf.get("spark.migrate.source.keyStore.path", "")
2829
val sourceKeyStorePassword = sc.getConf.get("spark.migrate.source.keyStore.password", "")
30+
val sourceEnabledAlgorithms = sc.getConf.get("spark.migrate.source.enabledAlgorithms", "")
2931

3032
val destinationIsAstra = sc.getConf.get("spark.migrate.destination.isAstra", "true")
3133
val destinationScbPath = sc.getConf.get("spark.migrate.destination.scb", "")
@@ -38,17 +40,19 @@ class AbstractJob extends App {
3840
val destinationTrustStoreType = sc.getConf.get("spark.migrate.destination.trustStore.type", "JKS")
3941
val destinationKeyStorePath = sc.getConf.get("spark.migrate.destination.keyStore.path", "")
4042
val destinationKeyStorePassword = sc.getConf.get("spark.migrate.destination.keyStore.password", "")
43+
val destinationEnabledAlgorithms = sc.getConf.get("spark.migrate.destination.enabledAlgorithms", "")
4144

4245
val minPartition = new BigInteger(sc.getConf.get("spark.migrate.source.minPartition"))
4346
val maxPartition = new BigInteger(sc.getConf.get("spark.migrate.source.maxPartition"))
44-
47+
val coveragePercent = sc.getConf.get("spark.migrate.coveragePercent", "100")
4548
val splitSize = sc.getConf.get("spark.migrate.splitSize", "10000")
49+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
4650

4751
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
48-
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword);
52+
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
4953

5054
var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationReadConsistencyLevel,
51-
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword);
55+
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
5256

5357
protected def exitSpark() = {
5458
spark.stop()
@@ -57,7 +61,7 @@ class AbstractJob extends App {
5761

5862
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
5963
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
60-
keyStorePath: String, keyStorePassword: String): CassandraConnector = {
64+
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
6165
var connType: String = "Source"
6266
if (!isSource) {
6367
connType = "Destination"
@@ -74,17 +78,24 @@ class AbstractJob extends App {
7478
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
7579
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL!");
7680

81+
// Use defaults when not provided
82+
var enabledAlgorithmsVar = enabledAlgorithms
83+
if (enabledAlgorithms == null || enabledAlgorithms.trim.isEmpty) {
84+
enabledAlgorithmsVar = "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA"
85+
}
86+
7787
return CassandraConnector(sc.getConf
7888
.set("spark.cassandra.auth.username", username)
7989
.set("spark.cassandra.auth.password", password)
8090
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
8191
.set("spark.cassandra.connection.host", host)
8292
.set("spark.cassandra.connection.ssl.enabled", "true")
93+
.set("spark.cassandra.connection.ssl.enabledAlgorithms", enabledAlgorithmsVar)
8394
.set("spark.cassandra.connection.ssl.trustStore.password", trustStorePassword)
8495
.set("spark.cassandra.connection.ssl.trustStore.path", trustStorePath)
8596
.set("spark.cassandra.connection.ssl.keyStore.password", keyStorePassword)
8697
.set("spark.cassandra.connection.ssl.keyStore.path", keyStorePath)
87-
.set("spark.cassandra.connection.ssl.trustStore.type", trustStoreType)
98+
.set("spark.cassandra.connection.ssl.trustStore.type", trustStoreType)
8899
.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
89100
)
90101
} else {

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,19 @@ package datastax.astra.migrate
33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.apache.log4j.Logger
55

6-
import java.lang.Long
7-
import java.math.BigInteger
86
import scala.collection.JavaConversions._
97

108
object DiffData extends AbstractJob {
119

1210
val logger = Logger.getLogger(this.getClass.getName)
1311
logger.info("Started Data Validation App")
1412

15-
diffTable(sourceConnection, destinationConnection, minPartition, maxPartition)
13+
diffTable(sourceConnection, destinationConnection)
1614

1715
exitSpark
1816

19-
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
20-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
17+
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
2118
val parts = sc.parallelize(partitions.toSeq,partitions.size);
22-
2319
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2420

2521
parts.foreach(part => {

src/main/scala/datastax/astra/migrate/DiffMetaData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ object DiffMetaData extends App {
7676

7777
private def diffTable(sourceConnection: CassandraConnector, astraConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
7878

79-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
79+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition, 100)
8080
val parts = sc.parallelize(partitions.toSeq,partitions.size);
8181
parts.foreach(part => {
8282
sourceConnection.withSessionDo(sourceSession => astraConnection.withSessionDo(astraSession=>DiffMetaJobSession.getInstance(sourceSession,astraSession, sc.getConf).getDataDiffAndCorrect(part.getMin, part.getMax)))

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package datastax.astra.migrate
33
import com.datastax.spark.connector.cql.CassandraConnector
44
import org.apache.log4j.Logger
55

6-
import java.lang.Long
7-
import java.math.BigInteger
86
import scala.collection.JavaConversions._
97

108
// http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/
@@ -14,14 +12,12 @@ object Migrate extends AbstractJob {
1412
val logger = Logger.getLogger(this.getClass.getName)
1513
logger.info("Started Migration App")
1614

17-
migrateTable(sourceConnection, destinationConnection, minPartition, maxPartition)
15+
migrateTable(sourceConnection, destinationConnection)
1816

1917
exitSpark
2018

21-
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, minPartition:BigInteger, maxPartition:BigInteger) = {
22-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition)
19+
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
2320
val parts = sc.parallelize(partitions.toSeq,partitions.size);
24-
2521
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2622

2723
parts.foreach(part => {

src/resources/diff_data.sh

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/resources/sparkConf.properties

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ spark.migrate.destination.autocorrect.missing false
1515
spark.migrate.destination.autocorrect.mismatch false
1616

1717
spark.migrate.maxRetries 10
18-
spark.migrate.readRateLimit 40000
19-
spark.migrate.writeRateLimit 40000
20-
spark.migrate.splitSize 5
18+
spark.migrate.readRateLimit 20000
19+
spark.migrate.writeRateLimit 20000
20+
spark.migrate.splitSize 10000
2121
spark.migrate.batchSize 5
22+
spark.migrate.coveragePercent 100
2223
spark.migrate.printStatsAfter 100000
2324

2425
spark.migrate.query.cols.select partition-key,clustering-key,order-date,amount,writetime(order-date),writetime(amount),ttl(order-date),ttl(amount)
@@ -49,13 +50,15 @@ spark.migrate.source.maxWriteTimeStampFilter 9223372036854775
4950
#spark.migrate.source.trustStore.type JKS
5051
#spark.migrate.source.keyStore.path
5152
#spark.migrate.source.keyStore.password
53+
#spark.migrate.source.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
5254

5355
####################### ONLY USE if SSL clientAuth is enabled on destination Cassandra/DSE #############################
5456
#spark.migrate.destination.trustStore.path
5557
#spark.migrate.destination.trustStore.password
5658
#spark.migrate.destination.trustStore.type JKS
5759
#spark.migrate.destination.keyStore.path
5860
#spark.migrate.destination.keyStore.password
61+
#spark.migrate.destination.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
5962

6063
########################################################################################################################
6164
# Following are the supported data types and their corresponding [Cassandra data-types]

0 commit comments

Comments
 (0)