Skip to content

Commit 7d44b31

Browse files
authored
Merge pull request #92 from datastax/feature/rename-splitsize-to-numsplits
Renamed property splitSize to numSplits (support backward compatibility)
2 parents 39ab797 + 473406b commit 7d44b31

File tree

11 files changed

+58
-49
lines changed

11 files changed

+58
-49
lines changed

README.md

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,21 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
66

77
> :warning: Please note this job has been tested with spark version [3.3.1](https://archive.apache.org/dist/spark/spark-3.3.1/)
88
9-
## Container Image
9+
## Install as a Container
1010
- Get the latest image that includes all dependencies from [DockerHub](https://hub.docker.com/r/datastax/cassandra-data-migrator)
11-
- If you use this route, all migration tools (`cassandra-data-migrator` + `dsbulk` + `cqlsh`) would be available in the `/assets/` folder of the container
12-
- OR follow the below build steps (and Prerequisite) to build the jar locally
11+
- All migration tools (`cassandra-data-migrator` + `dsbulk` + `cqlsh`) would be available in the `/assets/` folder of the container
1312

14-
### Prerequisite
13+
## Install as a JAR file
14+
- Download the latest jar file from the GitHub [packages area here](https://github.com/orgs/datastax/packages?repo_name=cassandra-data-migrator)
1515

16+
### Prerequisite
1617
- Install Java8 as spark binaries are compiled with it.
17-
- Install Maven 3.8.x
18-
- Install single instance of spark on a node where you want to run this job. Spark can be installed by running the following: -
19-
18+
- Install Spark version [3.3.1](https://archive.apache.org/dist/spark/spark-3.3.1/) on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: -
2019
```
2120
wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
2221
tar -xvzf spark-3.3.1-bin-hadoop3.tgz
2322
```
2423

25-
### Build
26-
1. Clone this repo
27-
2. Move to the repo folder `cd cassandra-data-migrator`
28-
3. Run the build `mvn clean package`
29-
4. The fat jar (`cassandra-data-migrator-3.x.x.jar`) file should now be present in the `target` folder
30-
3124
# Steps for Data-Migration:
3225

3326
1. `sparkConf.properties` file needs to be configured as applicable for the environment
@@ -41,8 +34,14 @@ tar -xvzf spark-3.3.1-bin-hadoop3.tgz
4134
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
4235
```
4336

44-
Note: Above command also generates a log file `logfile_name.txt` to avoid log output on the console.
45-
37+
Note:
38+
- Above command generates a log file `logfile_name.txt` to avoid log output on the console.
39+
- Add option `--driver-memory 25G --executor-memory 25G` as shown below if the table migrated is large (over 100GB)
40+
```
41+
./spark-submit --properties-file sparkConf.properties /
42+
--master "local[*]" --driver-memory 25G --executor-memory 25G /
43+
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
44+
```
4645

4746
# Steps for Data-Validation:
4847

@@ -64,19 +63,20 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
6463
```
6564

6665
- Please grep for all `ERROR` from the output log files to get the list of missing and mismatched records.
67-
- Note that it lists differences by partition key values.
66+
- Note that it lists differences by primary-key values.
6867
- The Validation job can also be run in an AutoCorrect mode. This mode can
6968
- Add any missing records from origin to target
70-
- Fix any inconsistencies between origin and target (makes target same as origin).
69+
- Update any mismatched records between origin and target (makes target same as origin).
7170
- Enable/disable this feature using one or both of the below setting in the config file
72-
7371
```
7472
spark.target.autocorrect.missing true|false
7573
spark.target.autocorrect.mismatch true|false
7674
```
75+
Note:
76+
- The validation job will never delete records from target i.e. it only adds or updates data on target
7777

7878
# Migrating specific partition ranges
79-
- You can also use the tool to migrate specific partition ranges, use class option `--class datastax.astra.migrate.MigratePartitionsFromFile` as shown below
79+
- You can also use the tool to migrate specific partition ranges using class option `--class datastax.astra.migrate.MigratePartitionsFromFile` as shown below
8080
```
8181
./spark-submit --properties-file sparkConf.properties /
8282
--master "local[*]" /
@@ -90,18 +90,26 @@ When running in above mode the tool assumes a `partitions.csv` file to be presen
9090
2637884402540451982,4638499294009575633
9191
798869613692279889,8699484505161403540
9292
```
93-
This mode is specifically useful to processes a subset of partition-ranges that may have generated errors as a result of a previous long-running job to migrate a large table.
94-
95-
# Additional features
96-
- [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
97-
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTL](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
98-
- Advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
99-
- Filter records from origin using writetimes, CQL conditions, token-ranges
93+
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.
94+
95+
# Features
96+
- Supports migration/validation of [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
97+
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
98+
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
99+
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or min/max token-range
100+
- Supports adding `constants` as new columns on `Target`
100101
- Fully containerized (Docker and K8s friendly)
101102
- SSL Support (including custom cipher algorithms)
102-
- Migrate from any Cassandra origin ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra))
103+
- Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra))
104+
- Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra)
103105
- Validate migration accuracy and performance using a smaller randomized data-set
104-
- Custom writetime
106+
- Supports adding custom fixed `writetime`
107+
108+
# Building Jar for local development
109+
1. Clone this repo
110+
2. Move to the repo folder `cd cassandra-data-migrator`
111+
3. Run the build `mvn clean package` (Needs Maven 3.8.x)
112+
4. The fat jar (`cassandra-data-migrator-3.x.x.jar`) file should now be present in the `target` folder
105113

106114
# Contributors
107115
Checkout all our wonderful contributors [here](./CONTRIBUTING.md#contributors).

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.2.1</revision>
11+
<revision>3.2.2</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>

src/main/java/datastax/astra/migrate/SplitPartitions.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@ public static void main(String[] args) throws IOException {
3030
}
3131
}
3232

33-
public static Collection<Partition> getRandomSubPartitions(int splitSize, BigInteger min, BigInteger max, int coveragePercent) {
33+
public static Collection<Partition> getRandomSubPartitions(int numSplits, BigInteger min, BigInteger max, int coveragePercent) {
3434
logger.info("ThreadID: {} Splitting min: {} max: {}", Thread.currentThread().getId(), min, max);
35-
List<Partition> partitions = getSubPartitions(splitSize, min, max, coveragePercent);
35+
List<Partition> partitions = getSubPartitions(numSplits, min, max, coveragePercent);
3636
Collections.shuffle(partitions);
3737
Collections.shuffle(partitions);
3838
Collections.shuffle(partitions);
3939
Collections.shuffle(partitions);
4040
return partitions;
4141
}
4242

43-
public static List<Partition> getSubPartitionsFromFile(int splitSize) throws IOException {
43+
public static List<Partition> getSubPartitionsFromFile(int numSplits) throws IOException {
4444
logger.info("ThreadID: {} Splitting partitions in file: ./partitions.csv using a split-size of {}"
45-
, Thread.currentThread().getId(), splitSize);
45+
, Thread.currentThread().getId(), numSplits);
4646
List<Partition> partitions = new ArrayList<Partition>();
4747
BufferedReader reader = Util.getfileReader("./partitions.csv");
4848
String line = null;
@@ -52,7 +52,7 @@ public static List<Partition> getSubPartitionsFromFile(int splitSize) throws IOE
5252
}
5353
String[] minMax = line.split(",");
5454
try {
55-
partitions.addAll(getSubPartitions(splitSize, new BigInteger(minMax[0]), new BigInteger(minMax[1]), 100));
55+
partitions.addAll(getSubPartitions(numSplits, new BigInteger(minMax[0]), new BigInteger(minMax[1]), 100));
5656
} catch (Exception e) {
5757
logger.error("Skipping partition: {}", line, e);
5858
}
@@ -61,9 +61,9 @@ public static List<Partition> getSubPartitionsFromFile(int splitSize) throws IOE
6161
return partitions;
6262
}
6363

64-
public static List<PKRows> getRowPartsFromFile(int splitSize) throws IOException {
64+
public static List<PKRows> getRowPartsFromFile(int numSplits) throws IOException {
6565
logger.info("ThreadID: {} Splitting rows in file: ./primary_key_rows.csv using a split-size of {}"
66-
, Thread.currentThread().getId(), splitSize);
66+
, Thread.currentThread().getId(), numSplits);
6767
List<String> pkRows = new ArrayList<String>();
6868
BufferedReader reader = Util.getfileReader("./primary_key_rows.csv");
6969
String pkRow = null;
@@ -73,7 +73,7 @@ public static List<PKRows> getRowPartsFromFile(int splitSize) throws IOException
7373
}
7474
pkRows.add(pkRow);
7575
}
76-
int partSize = pkRows.size() / splitSize;
76+
int partSize = pkRows.size() / numSplits;
7777
if (partSize == 0) {
7878
partSize = pkRows.size();
7979
}
@@ -91,12 +91,12 @@ public static <T> Stream<List<T>> batches(List<T> source, int length) {
9191
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
9292
}
9393

94-
private static List<Partition> getSubPartitions(int splitSize, BigInteger min, BigInteger max, int coveragePercent) {
94+
private static List<Partition> getSubPartitions(int numSplits, BigInteger min, BigInteger max, int coveragePercent) {
9595
if (coveragePercent < 1 || coveragePercent > 100) {
9696
coveragePercent = 100;
9797
}
9898
BigInteger curMax = new BigInteger(min.toString());
99-
BigInteger partitionSize = max.subtract(min).divide(BigInteger.valueOf(splitSize));
99+
BigInteger partitionSize = max.subtract(min).divide(BigInteger.valueOf(numSplits));
100100
List<Partition> partitions = new ArrayList<Partition>();
101101
if (partitionSize.compareTo(new BigInteger("0")) == 0) {
102102
partitionSize = new BigInteger("100000");

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class AbstractJob extends BaseJob {
77

88
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
99
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
10-
abstractLogger.info("PARAM -- Split Size: " + splitSize)
10+
abstractLogger.info("PARAM -- Number of Splits : " + numSplits)
1111
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
1212
abstractLogger.info("PARAM -- Origin SSL Enabled: {}", sourceSSLEnabled);
1313
abstractLogger.info("PARAM -- Target SSL Enabled: {}", destinationSSLEnabled);

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class BaseJob extends App {
4949
val minPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.minPartition", "-9223372036854775808"))
5050
val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807"))
5151
val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")
52-
val splitSize = Integer.parseInt(Util.getSparkPropOr(sc, "spark.splitSize", "10000"))
52+
val splitSizeBackwardCompatibility = Util.getSparkPropOr(sc, "spark.splitSize", "10000")
53+
val numSplits = Integer.parseInt(Util.getSparkPropOr(sc, "spark.numSplits", splitSizeBackwardCompatibility))
5354

5455
protected def exitSpark() = {
5556
spark.stop()

src/main/scala/datastax/astra/migrate/DiffData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ object DiffData extends AbstractJob {
1616
exitSpark
1717

1818
private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, config: SparkConf) = {
19-
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
19+
val partitions = SplitPartitions.getRandomSubPartitions(numSplits, minPartition, maxPartition, Integer.parseInt(coveragePercent))
2020
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2121
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
2222
logger.info("Spark parallelize created : " + parts.count() + " parts!");

src/main/scala/datastax/astra/migrate/Migrate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object Migrate extends AbstractJob {
1818
exitSpark
1919

2020
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, config: SparkConf) = {
21-
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
21+
val partitions = SplitPartitions.getRandomSubPartitions(numSplits, minPartition, maxPartition, Integer.parseInt(coveragePercent))
2222
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2323
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
2424
logger.info("Spark parallelize created : " + parts.count() + " parts!");

src/main/scala/datastax/astra/migrate/MigratePartitionsFromFile.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ object MigratePartitionsFromFile extends AbstractJob {
1515
exitSpark
1616

1717
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
18-
val partitions = SplitPartitions.getSubPartitionsFromFile(splitSize)
18+
val partitions = SplitPartitions.getSubPartitionsFromFile(numSplits)
1919
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
2020
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
2121
logger.info("Spark parallelize created : " + parts.count() + " parts!");

src/main/scala/datastax/astra/migrate/MigrateRowsFromFile.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ object MigrateRowsFromFile extends AbstractJob {
1313
exitSpark
1414

1515
private def migrateTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector) = {
16-
val listOfPKRows = SplitPartitions.getRowPartsFromFile(splitSize)
16+
val listOfPKRows = SplitPartitions.getRowPartsFromFile(numSplits)
1717
logger.info("PARAM Calculated -- Number of PKRows: " + listOfPKRows.size())
1818

1919
sourceConnection.withSessionDo(sourceSession =>

src/main/scala/datastax/astra/migrate/OriginData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ object OriginData extends BaseJob {
6363
}
6464

6565
private def analyzeSourceTable(sourceConnection: CassandraConnector) = {
66-
val partitions = SplitPartitions.getRandomSubPartitions(splitSize, minPartition, maxPartition, Integer.parseInt(coveragePercent))
66+
val partitions = SplitPartitions.getRandomSubPartitions(numSplits, minPartition, maxPartition, Integer.parseInt(coveragePercent))
6767
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
6868
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
6969
logger.info("Spark parallelize created : " + parts.count() + " parts!");

0 commit comments

Comments
 (0)