Skip to content

Commit b10d1bb

Browse files
committed
Processes partitions from a partition file.
One main use of this feature would be to process partitions from a previous run that had errors
1 parent 872403c commit b10d1bb

File tree

8 files changed

+82
-14
lines changed

8 files changed

+82
-14
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>1.7</version>
6+
<version>1.8</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
3535
sourceKeyspaceTable = sparkConf.get("spark.source.keyspaceTable");
3636
astraKeyspaceTable = sparkConf.get("spark.destination.keyspaceTable");
3737

38-
String ttlColsStr = sparkConf.get("spark.query.ttl.cols");
38+
String ttlColsStr = sparkConf.get("spark.query.ttl.cols", "");
3939
if (null != ttlColsStr && ttlColsStr.trim().length() > 0) {
4040
for (String ttlCol : ttlColsStr.split(",")) {
4141
ttlCols.add(Integer.parseInt(ttlCol));
4242
}
4343
}
4444

45-
String writeTimestampColsStr = sparkConf.get("spark.query.writetime.cols");
45+
String writeTimestampColsStr = sparkConf.get("spark.query.writetime.cols", "");
4646
if (null != writeTimestampColsStr && writeTimestampColsStr.trim().length() > 0) {
4747
for (String writeTimeStampCol : writeTimestampColsStr.split(",")) {
4848
writeTimeStampCols.add(Integer.parseInt(writeTimeStampCol));

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
55

6+
import java.io.BufferedReader;
7+
import java.io.FileReader;
8+
import java.io.IOException;
69
import java.io.Serializable;
710
import java.math.BigInteger;
811
import java.util.ArrayList;
@@ -16,15 +19,16 @@ public class SplitPartitions {
1619
public final static Long MAX_PARTITION = Long.MAX_VALUE;
1720
public static Logger logger = LoggerFactory.getLogger(SplitPartitions.class.getName());
1821

19-
public static void main(String[] args) {
20-
Collection<Partition> partitions = getSubPartitions(new BigInteger("20"), BigInteger.valueOf(MIN_PARTITION),
21-
BigInteger.valueOf(MAX_PARTITION), 20);
22+
public static void main(String[] args) throws IOException {
23+
Collection<Partition> partitions = getSubPartitions(2, BigInteger.valueOf(1),
24+
BigInteger.valueOf(1000), 100);
25+
// Collection<Partition> partitions = getSubPartitionsFromFile(3);
2226
for (Partition partition : partitions) {
2327
System.out.println(partition);
2428
}
2529
}
2630

27-
public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent) {
31+
public static Collection<Partition> getRandomSubPartitions(int splitSize, BigInteger min, BigInteger max, int coveragePercent) {
2832
logger.info("TreadID: " + Thread.currentThread().getId() + " Splitting min: " + min + " max:" + max);
2933
List<Partition> partitions = getSubPartitions(splitSize, min, max, coveragePercent);
3034
Collections.shuffle(partitions);
@@ -34,12 +38,30 @@ public static Collection<Partition> getRandomSubPartitions(BigInteger splitSize,
3438
return partitions;
3539
}
3640

37-
private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger min, BigInteger max, int coveragePercent) {
41+
public static List<Partition> getSubPartitionsFromFile(int splitSize) throws IOException {
42+
logger.info("TreadID: " + Thread.currentThread().getId() +
43+
" Splitting partitions in file: ./partitions.csv using a split-size of " + splitSize);
44+
List<Partition> partitions = new ArrayList<Partition>();
45+
BufferedReader reader = new BufferedReader(new FileReader("./partitions.csv"));
46+
String line = null;
47+
while ((line = reader.readLine()) != null) {
48+
String[] minMax = line.split(",");
49+
try {
50+
partitions.addAll(getSubPartitions(splitSize, new BigInteger(minMax[0]), new BigInteger(minMax[1]), 100));
51+
} catch (Exception e) {
52+
logger.error("Skipping partition: " + line, e);
53+
}
54+
}
55+
56+
return partitions;
57+
}
58+
59+
private static List<Partition> getSubPartitions(int splitSize, BigInteger min, BigInteger max, int coveragePercent) {
3860
if (coveragePercent < 1 || coveragePercent > 100) {
3961
coveragePercent = 100;
4062
}
4163
BigInteger curMax = new BigInteger(min.toString());
42-
BigInteger partitionSize = max.subtract(min).divide(splitSize);
64+
BigInteger partitionSize = max.subtract(min).divide(BigInteger.valueOf(splitSize));
4365
List<Partition> partitions = new ArrayList<Partition>();
4466
if (partitionSize.compareTo(new BigInteger("0")) == 0) {
4567
partitionSize = new BigInteger("100000");
@@ -64,6 +86,7 @@ private static List<Partition> getSubPartitions(BigInteger splitSize, BigInteger
6486
if (exausted) {
6587
break;
6688
}
89+
curMax = curMax.add(BigInteger.ONE);
6790
}
6891

6992
return partitions;

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ class AbstractJob extends BaseJob {
66

77
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
88
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
9-
abstractLogger.info("PARAM -- Split Size: " + coveragePercent)
9+
abstractLogger.info("PARAM -- Split Size: " + splitSize)
1010
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
11-
abstractLogger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
1211

1312
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
1413
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ class BaseJob extends App {
1010

1111
val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
1212
val spark = SparkSession.builder
13-
.appName("Datastax Data Validation")
13+
.appName("Cassandra Data Migrator")
1414
.getOrCreate()
15+
abstractLogger.info("################################################################################################")
16+
abstractLogger.info("############################## Cassandra Data Migrator - Starting ##############################")
17+
abstractLogger.info("################################################################################################")
1518

1619
val sc = spark.sparkContext
1720

@@ -44,11 +47,13 @@ class BaseJob extends App {
4447
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition", "-9223372036854775808"))
4548
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition", "9223372036854775807"))
4649
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
47-
val splitSize = sc.getConf.get("spark.splitSize", "10000")
48-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition, Integer.parseInt(coveragePercent))
50+
val splitSize = Integer.parseInt(sc.getConf.get("spark.splitSize", "10000"))
4951

5052
protected def exitSpark() = {
5153
spark.stop()
54+
abstractLogger.info("################################################################################################")
55+
abstractLogger.info("############################## Cassandra Data Migrator - Stopped ###############################")
56+
abstractLogger.info("################################################################################################")
5257
sys.exit(0)
5358
}
5459

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ object DiffData extends AbstractJob {
1515
exitSpark
1616

1717
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
18+
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
19+
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
1820
val parts = sc.parallelize(partitions.toSeq, partitions.size);
1921
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2022

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ object Migrate extends AbstractJob {
1717
exitSpark
1818

1919
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
20+
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
21+
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2022
val parts = sc.parallelize(partitions.toSeq, partitions.size);
2123
logger.info("Spark parallelize created : " + parts.count() + " parts!");
2224

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package datastax.astra.migrate
2+
3+
import com.datastax.spark.connector.cql.CassandraConnector
4+
import org.slf4j.LoggerFactory
5+
6+
import java.math.BigInteger
7+
import scala.collection.JavaConversions._
8+
import java.lang.Long
9+
10+
object MigratePartitionsFromFile extends AbstractJob {
11+
12+
val logger = LoggerFactory.getLogger(this.getClass.getName)
13+
logger.info("Started MigratePartitionsFromFile App")
14+
15+
migrateTable(sourceConnection, destinationConnection)
16+
17+
exitSpark
18+
19+
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
20+
val partitions = SplitPartitions.getSubPartitionsFromFile(splitSize)
21+
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
22+
val parts = sc.parallelize(partitions.toSeq, partitions.size);
23+
logger.info("Spark parallelize created : " + parts.count() + " parts!");
24+
25+
parts.foreach(part => {
26+
sourceConnection.withSessionDo(sourceSession =>
27+
destinationConnection.withSessionDo(destinationSession =>
28+
CopyJobSession.getInstance(sourceSession, destinationSession, sc.getConf)
29+
.getDataAndInsert(part.getMin, part.getMax)))
30+
})
31+
32+
}
33+
34+
}
35+
36+
37+

0 commit comments

Comments
 (0)