Skip to content

Commit a6f83cf

Browse files
committed
Merge branch 'main' into feature/logging-fix
* main: Update README removing spark properties not in use intial commit
2 parents d069490 + 1c9ed15 commit a6f83cf

File tree

5 files changed

+117
-91
lines changed

5 files changed

+117
-91
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Spark jobs in this repo can be used for data migration and data validation.
44

5-
> Please note: This job has been tested with spark version [2.4.8](https://downloads.apache.org/spark/spark-2.4.8/)
5+
> :warning: Please note this job has been tested with spark version [2.4.8](https://downloads.apache.org/spark/spark-2.4.8/)
66
77
## Prerequisite
88

@@ -16,7 +16,7 @@ tar -xvzf <spark downloaded file name>
1616

1717
# Steps:
1818

19-
1. sparkConf.properties file needs to be configured as applicable for the environment
19+
1. `sparkConf.properties` file needs to be configured as applicable for the environment
2020
> A sample Spark conf file configuration can be [found here](./src/resources/sparkConf.properties)
2121
2. Place the conf file where it can be accessed while running the job via spark-submit.
2222
3. Generate a fat jar (`cassandra-data-migrator-1.x.jar`) using command `mvn clean package`
@@ -63,10 +63,10 @@ spark.destination.autocorrect.mismatch true|false
6363
```
6464

6565
# Additional features
66-
- Counter tables
67-
- Preserve writetimes and TTL
68-
- Advanced DataTypes (Sets, Lists, Maps, UDTs)
66+
- [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
67+
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTL](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
68+
- Advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
6969
- Filter records from source using writetime
7070
- SSL Support (including custom cipher algorithms)
71-
- Migrate from any Cassandra source (Cassandra/DSE/Astra) to any Cassandra target (Cassandra/DSE/Astra)
71+
- Migrate from any Cassandra source ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra))
7272
- Validate migration accuracy and performance using a smaller randomized data-set

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,10 @@
1414
import java.util.Map;
1515
import java.util.Set;
1616

17-
public abstract class AbstractJobSession {
18-
19-
// Read/Write Rate limiter
20-
// Determine the total throughput for the entire cluster in terms of wries/sec,
21-
// reads/sec
22-
// then do the following to set the values as they are only applicable per JVM
23-
// (hence spark Executor)...
24-
// Rate = Total Throughput (write/read per sec) / Total Executors
25-
protected final RateLimiter readLimiter;
26-
protected final RateLimiter writeLimiter;
27-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
28-
protected PreparedStatement sourceSelectStatement;
29-
protected String sourceSelectCondition;
30-
protected PreparedStatement astraSelectStatement;
31-
protected PreparedStatement astraInsertStatement;
32-
protected Integer maxRetries = 10;
33-
34-
protected CqlSession sourceSession;
35-
protected CqlSession astraSession;
36-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
37-
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
38-
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
39-
40-
protected Integer batchSize = 1;
41-
protected Integer printStatsAfter = 100000;
42-
43-
protected Boolean isPreserveTTLWritetime = Boolean.FALSE;
44-
protected Boolean writeTimeStampFilter = Boolean.FALSE;
45-
protected Long minWriteTimeStampFilter = 0l;
46-
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
47-
48-
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
49-
protected List<Integer> ttlCols = new ArrayList<Integer>();
50-
protected Boolean isCounterTable;
51-
52-
protected String sourceKeyspaceTable;
53-
protected String astraKeyspaceTable;
54-
55-
protected Boolean hasRandomPartitioner;
17+
public class AbstractJobSession extends BaseJobSession {
5618

19+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
20+
5721
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
5822
this.sourceSession = sourceSession;
5923
this.astraSession = astraSession;
@@ -121,7 +85,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
12185
selectColTypes = getTypes(sparkConf.get("spark.query.types"));
12286
String idCols = sparkConf.get("spark.query.destination.id", "");
12387
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
124-
sourceSelectCondition = sparkConf.get("spark.query.condition", "");
88+
String sourceSelectCondition = sparkConf.get("spark.query.condition", "");
12589
sourceSelectStatement = sourceSession.prepare(
12690
"select " + selectCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
12791
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING");
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package datastax.astra.migrate;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
5+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
public abstract class BaseJobSession {
13+
14+
protected PreparedStatement sourceSelectStatement;
15+
protected PreparedStatement astraSelectStatement;
16+
protected PreparedStatement astraInsertStatement;
17+
18+
// Read/Write Rate limiter
19+
// Determine the total throughput for the entire cluster in terms of wries/sec,
20+
// reads/sec
21+
// then do the following to set the values as they are only applicable per JVM
22+
// (hence spark Executor)...
23+
// Rate = Total Throughput (write/read per sec) / Total Executors
24+
protected RateLimiter readLimiter;
25+
protected RateLimiter writeLimiter;
26+
protected Integer maxRetries = 10;
27+
28+
protected CqlSession sourceSession;
29+
protected CqlSession astraSession;
30+
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
31+
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
32+
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
33+
34+
protected Integer batchSize = 1;
35+
protected Integer printStatsAfter = 100000;
36+
37+
protected Boolean isPreserveTTLWritetime = Boolean.FALSE;
38+
protected Boolean writeTimeStampFilter = Boolean.FALSE;
39+
protected Long minWriteTimeStampFilter = 0l;
40+
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
41+
42+
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
43+
protected List<Integer> ttlCols = new ArrayList<Integer>();
44+
protected Boolean isCounterTable;
45+
46+
protected String sourceKeyspaceTable;
47+
protected String astraKeyspaceTable;
48+
49+
protected Boolean hasRandomPartitioner;
50+
51+
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,46 +7,7 @@ import org.slf4j.LoggerFactory
77
import java.math.BigInteger
88
import java.lang.Long
99

10-
class AbstractJob extends App {
11-
12-
val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
13-
val spark = SparkSession.builder
14-
.appName("Datastax Data Validation")
15-
.getOrCreate()
16-
17-
val sc = spark.sparkContext
18-
19-
val sourceIsAstra = sc.getConf.get("spark.source.isAstra", "false")
20-
val sourceScbPath = sc.getConf.get("spark.source.scb", "")
21-
val sourceHost = sc.getConf.get("spark.source.host", "")
22-
val sourceUsername = sc.getConf.get("spark.source.username", "")
23-
val sourcePassword = sc.getConf.get("spark.source.password", "")
24-
val sourceReadConsistencyLevel = sc.getConf.get("spark.source.read.consistency.level", "LOCAL_QUORUM")
25-
val sourceTrustStorePath = sc.getConf.get("spark.source.trustStore.path", "")
26-
val sourceTrustStorePassword = sc.getConf.get("spark.source.trustStore.password", "")
27-
val sourceTrustStoreType = sc.getConf.get("spark.source.trustStore.type", "JKS")
28-
val sourceKeyStorePath = sc.getConf.get("spark.source.keyStore.path", "")
29-
val sourceKeyStorePassword = sc.getConf.get("spark.source.keyStore.password", "")
30-
val sourceEnabledAlgorithms = sc.getConf.get("spark.source.enabledAlgorithms", "")
31-
32-
val destinationIsAstra = sc.getConf.get("spark.destination.isAstra", "true")
33-
val destinationScbPath = sc.getConf.get("spark.destination.scb", "")
34-
val destinationHost = sc.getConf.get("spark.destination.host", "")
35-
val destinationUsername = sc.getConf.get("spark.destination.username")
36-
val destinationPassword = sc.getConf.get("spark.destination.password")
37-
val destinationReadConsistencyLevel = sc.getConf.get("spark.destination.read.consistency.level", "LOCAL_QUORUM")
38-
val destinationTrustStorePath = sc.getConf.get("spark.destination.trustStore.path", "")
39-
val destinationTrustStorePassword = sc.getConf.get("spark.destination.trustStore.password", "")
40-
val destinationTrustStoreType = sc.getConf.get("spark.destination.trustStore.type", "JKS")
41-
val destinationKeyStorePath = sc.getConf.get("spark.destination.keyStore.path", "")
42-
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
43-
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
44-
45-
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition","-9223372036854775808"))
46-
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition","9223372036854775807"))
47-
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
48-
val splitSize = sc.getConf.get("spark.splitSize", "10000")
49-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
10+
class AbstractJob extends BaseJob {
5011

5112
abstractLogger.info("PARAM -- Min Partition: " + minPartition)
5213
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
@@ -60,11 +21,6 @@ class AbstractJob extends App {
6021
var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationReadConsistencyLevel,
6122
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
6223

63-
protected def exitSpark() = {
64-
spark.stop()
65-
sys.exit(0)
66-
}
67-
6824
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
6925
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
7026
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package datastax.astra.migrate
2+
3+
import org.apache.spark.sql.SparkSession
4+
import org.slf4j.LoggerFactory
5+
6+
import java.math.BigInteger
7+
import java.lang.Long
8+
9+
class BaseJob extends App {
10+
11+
val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
12+
val spark = SparkSession.builder
13+
.appName("Datastax Data Validation")
14+
.getOrCreate()
15+
16+
val sc = spark.sparkContext
17+
18+
val sourceIsAstra = sc.getConf.get("spark.source.isAstra", "false")
19+
val sourceScbPath = sc.getConf.get("spark.source.scb", "")
20+
val sourceHost = sc.getConf.get("spark.source.host", "")
21+
val sourceUsername = sc.getConf.get("spark.source.username", "")
22+
val sourcePassword = sc.getConf.get("spark.source.password", "")
23+
val sourceReadConsistencyLevel = sc.getConf.get("spark.source.read.consistency.level", "LOCAL_QUORUM")
24+
val sourceTrustStorePath = sc.getConf.get("spark.source.trustStore.path", "")
25+
val sourceTrustStorePassword = sc.getConf.get("spark.source.trustStore.password", "")
26+
val sourceTrustStoreType = sc.getConf.get("spark.source.trustStore.type", "JKS")
27+
val sourceKeyStorePath = sc.getConf.get("spark.source.keyStore.path", "")
28+
val sourceKeyStorePassword = sc.getConf.get("spark.source.keyStore.password", "")
29+
val sourceEnabledAlgorithms = sc.getConf.get("spark.source.enabledAlgorithms", "")
30+
31+
val destinationIsAstra = sc.getConf.get("spark.destination.isAstra", "true")
32+
val destinationScbPath = sc.getConf.get("spark.destination.scb", "")
33+
val destinationHost = sc.getConf.get("spark.destination.host", "")
34+
val destinationUsername = sc.getConf.get("spark.destination.username")
35+
val destinationPassword = sc.getConf.get("spark.destination.password")
36+
val destinationReadConsistencyLevel = sc.getConf.get("spark.destination.read.consistency.level", "LOCAL_QUORUM")
37+
val destinationTrustStorePath = sc.getConf.get("spark.destination.trustStore.path", "")
38+
val destinationTrustStorePassword = sc.getConf.get("spark.destination.trustStore.password", "")
39+
val destinationTrustStoreType = sc.getConf.get("spark.destination.trustStore.type", "JKS")
40+
val destinationKeyStorePath = sc.getConf.get("spark.destination.keyStore.path", "")
41+
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
42+
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
43+
44+
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition","-9223372036854775808"))
45+
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition","9223372036854775807"))
46+
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
47+
val splitSize = sc.getConf.get("spark.splitSize", "10000")
48+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
49+
50+
protected def exitSpark() = {
51+
spark.stop()
52+
sys.exit(0)
53+
}
54+
55+
}

0 commit comments

Comments
 (0)