Skip to content

Commit 350e998

Browse files
authored
Merge pull request #8 from datastax/feature/basejobsession
intial commit - basejobsession imple
2 parents 19a712d + 8a6c928 commit 350e998

File tree

4 files changed

+110
-84
lines changed

4 files changed

+110
-84
lines changed

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,8 @@
1414
import java.util.Map;
1515
import java.util.Set;
1616

17-
public abstract class AbstractJobSession {
17+
public class AbstractJobSession extends BaseJobSession {
1818

19-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
20-
21-
protected PreparedStatement sourceSelectStatement;
22-
protected String sourceSelectCondition;
23-
24-
protected PreparedStatement astraSelectStatement;
25-
26-
// Read/Write Rate limiter
27-
// Determine the total throughput for the entire cluster in terms of wries/sec,
28-
// reads/sec
29-
// then do the following to set the values as they are only applicable per JVM
30-
// (hence spark Executor)...
31-
// Rate = Total Throughput (write/read per sec) / Total Executors
32-
protected final RateLimiter readLimiter;
33-
protected final RateLimiter writeLimiter;
34-
protected Integer maxRetries = 10;
35-
36-
protected CqlSession sourceSession;
37-
protected CqlSession astraSession;
38-
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
39-
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
40-
41-
protected Integer batchSize = 1;
42-
protected Integer printStatsAfter = 100000;
43-
44-
protected Boolean isPreserveTTLWritetime = Boolean.FALSE;
45-
protected Boolean writeTimeStampFilter = Boolean.FALSE;
46-
protected Long minWriteTimeStampFilter = 0l;
47-
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
48-
49-
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
50-
protected List<Integer> ttlCols = new ArrayList<Integer>();
51-
protected Boolean isCounterTable;
52-
53-
protected String sourceKeyspaceTable;
54-
protected String astraKeyspaceTable;
55-
56-
protected Boolean hasRandomPartitioner;
5719

5820
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
5921
this.sourceSession = sourceSession;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package datastax.astra.migrate;
2+
3+
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
5+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
public abstract class BaseJobSession {
13+
14+
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
15+
16+
protected PreparedStatement sourceSelectStatement;
17+
protected String sourceSelectCondition;
18+
19+
protected PreparedStatement astraSelectStatement;
20+
21+
// Read/Write Rate limiter
22+
// Determine the total throughput for the entire cluster in terms of wries/sec,
23+
// reads/sec
24+
// then do the following to set the values as they are only applicable per JVM
25+
// (hence spark Executor)...
26+
// Rate = Total Throughput (write/read per sec) / Total Executors
27+
protected RateLimiter readLimiter;
28+
protected RateLimiter writeLimiter;
29+
protected Integer maxRetries = 10;
30+
31+
protected CqlSession sourceSession;
32+
protected CqlSession astraSession;
33+
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
34+
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
35+
36+
protected Integer batchSize = 1;
37+
protected Integer printStatsAfter = 100000;
38+
39+
protected Boolean isPreserveTTLWritetime = Boolean.FALSE;
40+
protected Boolean writeTimeStampFilter = Boolean.FALSE;
41+
protected Long minWriteTimeStampFilter = 0l;
42+
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
43+
44+
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
45+
protected List<Integer> ttlCols = new ArrayList<Integer>();
46+
protected Boolean isCounterTable;
47+
48+
protected String sourceKeyspaceTable;
49+
protected String astraKeyspaceTable;
50+
51+
protected Boolean hasRandomPartitioner;
52+
53+
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,58 +7,14 @@ import org.slf4j.LoggerFactory
77
import java.math.BigInteger
88
import java.lang.Long
99

10-
class AbstractJob extends App {
11-
12-
val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
13-
val spark = SparkSession.builder
14-
.appName("Datastax Data Validation")
15-
.getOrCreate()
16-
17-
val sc = spark.sparkContext
18-
19-
val sourceIsAstra = sc.getConf.get("spark.source.isAstra", "false")
20-
val sourceScbPath = sc.getConf.get("spark.source.scb", "")
21-
val sourceHost = sc.getConf.get("spark.source.host", "")
22-
val sourceUsername = sc.getConf.get("spark.source.username", "")
23-
val sourcePassword = sc.getConf.get("spark.source.password", "")
24-
val sourceReadConsistencyLevel = sc.getConf.get("spark.source.read.consistency.level", "LOCAL_QUORUM")
25-
val sourceTrustStorePath = sc.getConf.get("spark.source.trustStore.path", "")
26-
val sourceTrustStorePassword = sc.getConf.get("spark.source.trustStore.password", "")
27-
val sourceTrustStoreType = sc.getConf.get("spark.source.trustStore.type", "JKS")
28-
val sourceKeyStorePath = sc.getConf.get("spark.source.keyStore.path", "")
29-
val sourceKeyStorePassword = sc.getConf.get("spark.source.keyStore.password", "")
30-
val sourceEnabledAlgorithms = sc.getConf.get("spark.source.enabledAlgorithms", "")
31-
32-
val destinationIsAstra = sc.getConf.get("spark.destination.isAstra", "true")
33-
val destinationScbPath = sc.getConf.get("spark.destination.scb", "")
34-
val destinationHost = sc.getConf.get("spark.destination.host", "")
35-
val destinationUsername = sc.getConf.get("spark.destination.username")
36-
val destinationPassword = sc.getConf.get("spark.destination.password")
37-
val destinationReadConsistencyLevel = sc.getConf.get("spark.destination.read.consistency.level", "LOCAL_QUORUM")
38-
val destinationTrustStorePath = sc.getConf.get("spark.destination.trustStore.path", "")
39-
val destinationTrustStorePassword = sc.getConf.get("spark.destination.trustStore.password", "")
40-
val destinationTrustStoreType = sc.getConf.get("spark.destination.trustStore.type", "JKS")
41-
val destinationKeyStorePath = sc.getConf.get("spark.destination.keyStore.path", "")
42-
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
43-
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
44-
45-
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition","-9223372036854775808"))
46-
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition","9223372036854775807"))
47-
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
48-
val splitSize = sc.getConf.get("spark.splitSize", "10000")
49-
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
10+
class AbstractJob extends BaseJob {
5011

5112
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
5213
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
5314

5415
var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationReadConsistencyLevel,
5516
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
5617

57-
protected def exitSpark() = {
58-
spark.stop()
59-
sys.exit(0)
60-
}
61-
6218
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
6319
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
6420
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package datastax.astra.migrate
2+
3+
import org.apache.spark.sql.SparkSession
4+
import org.slf4j.LoggerFactory
5+
6+
import java.math.BigInteger
7+
import java.lang.Long
8+
9+
class BaseJob extends App {
10+
11+
val abstractLogger = LoggerFactory.getLogger(this.getClass.getName)
12+
val spark = SparkSession.builder
13+
.appName("Datastax Data Validation")
14+
.getOrCreate()
15+
16+
val sc = spark.sparkContext
17+
18+
val sourceIsAstra = sc.getConf.get("spark.source.isAstra", "false")
19+
val sourceScbPath = sc.getConf.get("spark.source.scb", "")
20+
val sourceHost = sc.getConf.get("spark.source.host", "")
21+
val sourceUsername = sc.getConf.get("spark.source.username", "")
22+
val sourcePassword = sc.getConf.get("spark.source.password", "")
23+
val sourceReadConsistencyLevel = sc.getConf.get("spark.source.read.consistency.level", "LOCAL_QUORUM")
24+
val sourceTrustStorePath = sc.getConf.get("spark.source.trustStore.path", "")
25+
val sourceTrustStorePassword = sc.getConf.get("spark.source.trustStore.password", "")
26+
val sourceTrustStoreType = sc.getConf.get("spark.source.trustStore.type", "JKS")
27+
val sourceKeyStorePath = sc.getConf.get("spark.source.keyStore.path", "")
28+
val sourceKeyStorePassword = sc.getConf.get("spark.source.keyStore.password", "")
29+
val sourceEnabledAlgorithms = sc.getConf.get("spark.source.enabledAlgorithms", "")
30+
31+
val destinationIsAstra = sc.getConf.get("spark.destination.isAstra", "true")
32+
val destinationScbPath = sc.getConf.get("spark.destination.scb", "")
33+
val destinationHost = sc.getConf.get("spark.destination.host", "")
34+
val destinationUsername = sc.getConf.get("spark.destination.username")
35+
val destinationPassword = sc.getConf.get("spark.destination.password")
36+
val destinationReadConsistencyLevel = sc.getConf.get("spark.destination.read.consistency.level", "LOCAL_QUORUM")
37+
val destinationTrustStorePath = sc.getConf.get("spark.destination.trustStore.path", "")
38+
val destinationTrustStorePassword = sc.getConf.get("spark.destination.trustStore.password", "")
39+
val destinationTrustStoreType = sc.getConf.get("spark.destination.trustStore.type", "JKS")
40+
val destinationKeyStorePath = sc.getConf.get("spark.destination.keyStore.path", "")
41+
val destinationKeyStorePassword = sc.getConf.get("spark.destination.keyStore.password", "")
42+
val destinationEnabledAlgorithms = sc.getConf.get("spark.destination.enabledAlgorithms", "")
43+
44+
val minPartition = new BigInteger(sc.getConf.get("spark.source.minPartition","-9223372036854775808"))
45+
val maxPartition = new BigInteger(sc.getConf.get("spark.source.maxPartition","9223372036854775807"))
46+
val coveragePercent = sc.getConf.get("spark.coveragePercent", "100")
47+
val splitSize = sc.getConf.get("spark.splitSize", "10000")
48+
val partitions = SplitPartitions.getRandomSubPartitions(BigInteger.valueOf(Long.parseLong(splitSize)), minPartition, maxPartition,Integer.parseInt(coveragePercent))
49+
50+
protected def exitSpark() = {
51+
spark.stop()
52+
sys.exit(0)
53+
}
54+
55+
}

0 commit comments

Comments
 (0)