Skip to content

Commit 4c6f976

Browse files
committed
Add support for RandomPartitioner
1 parent 6de84fb commit 4c6f976

File tree

6 files changed

+81
-27
lines changed

6 files changed

+81
-27
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public abstract class AbstractJobSession {
5252
protected String sourceKeyspaceTable;
5353
protected String astraKeyspaceTable;
5454

55+
56+
protected Boolean hasRandomPartitioner;
57+
5558
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
5659

5760
this.sourceSession = sourceSession;
@@ -87,6 +90,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
8790
logger.info(" DEFAULT -- WriteRateLimit: " + writeLimiter.getRate());
8891
logger.info(" DEFAULT -- WriteTimestampFilter: " + writeTimeStampFilter);
8992

93+
hasRandomPartitioner = Boolean.parseBoolean(sparkConf.get("spark.migrate.source.hasRandomPartitioner", "false"));
94+
9095
isCounterTable = Boolean.parseBoolean(sparkConf.get("spark.migrate.source.counterTable", "false"));
9196

9297
counterDeltaMaxIndex = Integer

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.api.core.cql.*;
55
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
6+
import com.datastax.oss.driver.internal.core.metadata.token.RandomToken;
67
import org.apache.log4j.Logger;
78
import org.apache.spark.SparkConf;
89

@@ -74,14 +75,15 @@ protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, Spar
7475

7576
}
7677

77-
public void getDataAndInsert(Long min, Long max) {
78+
public void getDataAndInsert(BigInteger min, BigInteger max) {
7879
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
7980
int maxAttempts = maxRetries;
8081
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
8182

8283
try {
8384

84-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(new Murmur3Token(min), new Murmur3Token(max)));
85+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner? min : min.longValueExact(), hasRandomPartitioner? max : max.longValueExact()));
86+
8587
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
8688

8789
// cannot do batching if the writeFilter is greater than 0 or

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.log4j.Logger;
88
import org.apache.spark.SparkConf;
99

10+
import java.math.BigInteger;
1011
import java.util.ArrayList;
1112
import java.util.List;
1213
import java.util.concurrent.ForkJoinPool;
@@ -45,6 +46,7 @@ public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession as
4546
}
4647
}
4748
}
49+
4850
return diffJobSession;
4951
}
5052

@@ -54,7 +56,7 @@ private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkC
5456
selectColTypes = getTypes(sparkConf.get("spark.migrate.diff.select.types"));
5557
}
5658

57-
public void getDataAndDiff(Long min, Long max) {
59+
public void getDataAndDiff(BigInteger min, BigInteger max) {
5860
ForkJoinPool customThreadPool = new ForkJoinPool();
5961
logger.info("TreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
6062
int maxAttempts = maxRetries;
@@ -63,7 +65,7 @@ public void getDataAndDiff(Long min, Long max) {
6365
try {
6466
// cannot do batching if the writeFilter is greater than 0
6567
ResultSet resultSet = sourceSession.execute(
66-
sourceSelectStatement.bind(min, max).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
68+
sourceSelectStatement.bind(hasRandomPartitioner? min : min.longValueExact(), hasRandomPartitioner? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
6769

6870
customThreadPool.submit(() -> {
6971
StreamSupport.stream(resultSet.spliterator(), true).forEach(sRow -> {

src/main/java/datastax/astra/migrate/DiffMetaJobSession.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.spark.SparkConf;
88
import org.spark_project.jetty.util.ConcurrentHashSet;
99

10+
import java.math.BigInteger;
1011
import java.util.*;
1112
import java.util.concurrent.CompletionStage;
1213
import java.util.concurrent.atomic.AtomicLong;
@@ -93,7 +94,7 @@ private DiffMetaJobSession(CqlSession sourceSession, CqlSession astraSession, Sp
9394
}
9495

9596

96-
public void getDataDiffAndCorrect(Long min, Long max) {
97+
public void getDataDiffAndCorrect(BigInteger min, BigInteger max) {
9798
try {
9899
correctData(getDataAndDiff(min, max));
99100
logger.info("ThreadID: " + Thread.currentThread().getId() + " CorrectFinal Read Record Count: " + readCounter.get());
@@ -108,14 +109,14 @@ public void getDataDiffAndCorrect(Long min, Long max) {
108109
}
109110
}
110111

111-
private Set<SrcDestKey> getDataAndDiff(Long min, Long max) {
112+
private Set<SrcDestKey> getDataAndDiff(BigInteger min, BigInteger max) {
112113
Set<SrcDestKey> srcDestKeys = new HashSet<SrcDestKey>();
113114
logger.info("ThreadID: " + Thread.currentThread().getId() + " Processing min: " + min + " max:" + max);
114115
int maxAttempts = maxRetries;
115116
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
116117

117118
try {
118-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(min, max).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
119+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner? min : min.longValueExact(), hasRandomPartitioner? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
119120
for (Row sourceRow : resultSet) {
120121
readLimiter.acquire(1);
121122
// do not process rows less than minWriteTimeStampFilter or more than

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package datastax.astra.migrate;
22

3+
import com.datastax.oss.driver.internal.core.metadata.token.RandomToken;
4+
import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory;
35
import org.apache.log4j.Logger;
6+
import scala.math.BigInt;
47

58
import java.io.Serializable;
69
import java.math.BigInteger;
@@ -15,9 +18,12 @@ public class SplitPartitions {
1518
public final static Long MIN_PARTITION = Long.MIN_VALUE;
1619
public final static Long MAX_PARTITION = Long.MAX_VALUE;
1720

21+
public static final BigInteger MIN_RANDOM = new BigInteger("-1");
22+
public static final BigInteger MAX_RANDOM = (new BigInteger("2")).pow(127);
23+
1824

1925
public static void main(String[] args){
20-
Collection<Partition> partitions = getSubPartitions(new BigInteger("10"), BigInteger.valueOf(MIN_PARTITION), BigInteger.valueOf(MAX_PARTITION));
26+
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), MIN_RANDOM, MAX_RANDOM);
2127
for(Partition partition: partitions){
2228
System.out.println(partition);
2329
}
@@ -32,23 +38,53 @@ public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize,
3238
Collections.shuffle(partitions);
3339
return partitions;
3440
}
41+
// private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
42+
// long curMax = min.longValueExact();
43+
// long partitionSize = max.subtract(min).divide(splitSize).longValueExact();
44+
// List<Partition> partitions = new ArrayList<Partition>();
45+
// if(partitionSize==0){
46+
// partitionSize=100000;
47+
// }
48+
// boolean exausted = false;
49+
// while(curMax<=max.longValueExact()){
50+
// long curMin = curMax;
51+
// long newCurMax = curMin + partitionSize;
52+
// if (newCurMax < curMax) {
53+
// newCurMax = max.longValueExact();
54+
// exausted = true;
55+
// }
56+
// if(newCurMax > max.longValueExact()){
57+
// newCurMax=max.longValueExact();
58+
// exausted=true;
59+
// }
60+
// curMax = newCurMax;
61+
// partitions.add(new Partition(curMin,curMax));
62+
// if(exausted){
63+
// break;
64+
// }
65+
// }
66+
//
67+
// return partitions;
68+
// }
69+
70+
3571
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max){
36-
long curMax = min.longValueExact();
37-
long partitionSize = max.subtract(min).divide(splitSize).longValueExact();
72+
BigInteger curMax = new BigInteger(min.toString());
73+
BigInteger partitionSize = max.subtract(min).divide(splitSize);
3874
List<Partition> partitions = new ArrayList<Partition>();
39-
if(partitionSize==0){
40-
partitionSize=100000;
75+
if(partitionSize.compareTo(new BigInteger("0"))==0){
76+
partitionSize=new BigInteger("100000");
4177
}
4278
boolean exausted = false;
43-
while(curMax<=max.longValueExact()){
44-
long curMin = curMax;
45-
long newCurMax = curMin + partitionSize;
46-
if (newCurMax < curMax) {
47-
newCurMax = max.longValueExact();
79+
while(curMax.compareTo(max) <=0){
80+
BigInteger curMin = new BigInteger(curMax.toString());
81+
BigInteger newCurMax = curMin.add(partitionSize);
82+
if (newCurMax.compareTo(curMax) == -1) {
83+
newCurMax = new BigInteger(max.toString());
4884
exausted = true;
4985
}
50-
if(newCurMax > max.longValueExact()){
51-
newCurMax=max.longValueExact();
86+
if (newCurMax.compareTo(max)==1){
87+
newCurMax = new BigInteger(max.toString());
5288
exausted=true;
5389
}
5490
curMax = newCurMax;
@@ -63,27 +99,32 @@ private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger
6399

64100

65101

66-
67102
public static class Partition implements Serializable{
68103
private static final long serialVersionUID = 1L;
69104

70-
private Long min;
71-
private Long max;
72-
public Partition(Long min, Long max){
105+
private BigInteger min;
106+
private BigInteger max;
107+
108+
109+
public Partition(BigInteger min, BigInteger max){
73110
this.min = min;
74111
this.max = max;
75112
}
76113

77-
public Long getMin() {
114+
115+
116+
public BigInteger getMin() {
78117
return min;
79118
}
80119

81-
public Long getMax() {
120+
public BigInteger getMax() {
82121
return max;
83122
}
84123

85124
public String toString(){
86-
return "--conf spark.migrate.source.minPartition="+ min + " --conf spark.migrate.source.maxPartition=" + max;
125+
// return "--conf spark.migrate.source.minPartition="+ min + " --conf spark.migrate.source.maxPartition=" + max;
126+
127+
return "select * from field_api.field_users where token(account_id,field_id)>="+ min + " and token(account_id,field_id)<=" + max + " and account_id=ee8556f4-9a1a-4c89-ae05-e8105d42ed6f allow filtering; ";
87128
}
88129
}
89130
}

src/resources/runCommands.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
wget https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.6.tgz
22

3-
~/Documents/Astra/spark-2.4.8-bin-hadoop2.7/bin//spark-submit --properties-file /Users/ankitpatel/Documents/spark-astra-migrator-ranges/src/resources/sparkConf.properties --verbose --master "local[8]" --conf spark.migrate.source.minPartition=-9223372036854775808 --conf spark.migrate.source.maxPartition=9223372036854775807 --class datastax.astra.migrate.Migrate /Users/ankitpatel/Documents/spark-astra-migrator-ranges/target/migrate-0.1.jar
3+
--driver-memory 8G
44

5+
~/Documents/Astra/spark-2.4.8-bin-hadoop2.7/bin//spark-submit --properties-file /Users/ankitpatel/Documents/spark-astra-migrator-ranges/src/resources/sparkConf.properties --verbose --master "local[8]" --conf spark.migrate.source.minPartition=-9223372036854775808 --conf spark.migrate.source.maxPartition=9223372036854775807 --class datastax.astra.migrate.Migrate /Users/ankitpatel/Documents/spark-astra-migrator-ranges/target/migrate-0.1.jar
56

7+
Random Partitioner Run Command
8+
~/Documents/Astra/spark-2.4.8-bin-hadoop2.7/bin//spark-submit --properties-file /Users/ankitpatel/Documents/spark-astra-migrator-ranges/src/resources/sparkConf.properties --verbose --master "local[8]" --conf spark.migrate.source.minPartition=-1 --conf spark.migrate.source.maxPartition=170141183460469231731687303715884105728 --class datastax.astra.migrate.Migrate /Users/ankitpatel/Documents/spark-astra-migrator-ranges/target/migrate-0.9.jar
69

710

811
//Diff Data

0 commit comments

Comments
 (0)