Skip to content

Commit 69bb9ec

Browse files
authored
Merge pull request #42 from datastax/feature/config-should-represent-base-use-case
Refactorred config
2 parents 630de98 + 85b35ab commit 69bb9ec

File tree

10 files changed

+115
-59
lines changed

10 files changed

+115
-59
lines changed

.github/workflows/maven.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time
2+
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven
3+
4+
# This workflow uses actions that are not certified by GitHub.
5+
# They are provided by a third-party and are governed by
6+
# separate terms of service, privacy policy, and support
7+
# documentation.
8+
9+
name: Java CI with Maven
10+
11+
on: [push, pull_request]
12+
13+
jobs:
14+
build:
15+
16+
runs-on: ubuntu-latest
17+
18+
steps:
19+
- uses: actions/checkout@v3
20+
- name: Set up JDK 8
21+
uses: actions/setup-java@v3
22+
with:
23+
java-version: '8'
24+
distribution: 'temurin'
25+
cache: maven
26+
- name: Build with Maven
27+
run: mvn -B package --file pom.xml
28+
29+
# Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive
30+
- name: Update dependency graph
31+
uses: advanced-security/maven-dependency-submission-action@571e99aab1055c2e71a1e2309b9691de18d6b7d6
32+

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# cassandra-data-migrator
22

3-
Spark jobs in this repo can be used for data migration and data validation.
3+
Migrate and Validate Tables between Origin and Target Cassandra Clusters.
44

55
> :warning: Please note this job has been tested with spark version [2.4.8](https://archive.apache.org/dist/spark/spark-2.4.8/)
66
@@ -20,12 +20,12 @@ wget https://downloads.apache.org/spark/spark-2.4.8/
2020
tar -xvzf <spark downloaded file name>
2121
```
2222

23-
# Steps:
23+
# Steps for Data-Migration:
2424

2525
1. `sparkConf.properties` file needs to be configured as applicable for the environment
2626
> A sample Spark conf file configuration can be [found here](./src/resources/sparkConf.properties)
2727
2. Place the conf file where it can be accessed while running the job via spark-submit.
28-
3. Run the 'Data Migration' job using `spark-submit` command as shown below:
28+
3. Run the below job using `spark-submit` command as shown below:
2929

3030
```
3131
./spark-submit --properties-file sparkConf.properties /
@@ -36,7 +36,7 @@ tar -xvzf <spark downloaded file name>
3636
Note: Above command also generates a log file `logfile_name.txt` to avoid log output on the console.
3737

3838

39-
# Data-validation job
39+
# Steps for Data-Validation:
4040

4141
- To run the job in Data validation mode, use class option `--class datastax.astra.migrate.DiffData` as shown below
4242

@@ -90,6 +90,6 @@ This mode is specifically useful to processes a subset of partition-ranges that
9090
- Advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
9191
- Filter records from origin using writetime
9292
- SSL Support (including custom cipher algorithms)
93-
- Migrate from any Cassandra origin ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra](https://cassandra.apache.org)/[DataStax Enterprise (DSE)](https://www.datastax.com/products/datastax-enterprise)/[DataStax Astra DB](https://www.datastax.com/products/datastax-astra))
93+
- Migrate from any Cassandra origin ([Apache Cassandra](https://cassandra.apache.org) / [DataStax Enterprise](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra](https://cassandra.apache.org) / [DataStax Enterprise](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB](https://www.datastax.com/products/datastax-astra))
9494
- Validate migration accuracy and performance using a smaller randomized data-set
9595
- Custom writetime

cassandra-data-migrator.iml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,17 @@
197197
<orderEntry type="library" name="Maven: com.github.spotbugs:spotbugs-annotations:3.1.12" level="project" />
198198
<orderEntry type="library" name="Maven: com.datastax.oss:java-driver-mapper-runtime:4.10.0" level="project" />
199199
<orderEntry type="library" name="Maven: com.datastax.oss:java-driver-query-builder:4.10.0" level="project" />
200+
<orderEntry type="library" name="Maven: com.github.jnr:jnr-posix:3.1.15" level="project" />
201+
<orderEntry type="library" name="Maven: com.github.jnr:jnr-ffi:2.2.11" level="project" />
202+
<orderEntry type="library" name="Maven: com.github.jnr:jffi:1.3.9" level="project" />
203+
<orderEntry type="library" scope="RUNTIME" name="Maven: com.github.jnr:jffi:native:1.3.9" level="project" />
204+
<orderEntry type="library" name="Maven: org.ow2.asm:asm-commons:9.2" level="project" />
205+
<orderEntry type="library" name="Maven: org.ow2.asm:asm-analysis:9.2" level="project" />
206+
<orderEntry type="library" name="Maven: org.ow2.asm:asm-tree:9.2" level="project" />
207+
<orderEntry type="library" name="Maven: org.ow2.asm:asm-util:9.2" level="project" />
208+
<orderEntry type="library" name="Maven: com.github.jnr:jnr-a64asm:1.0.0" level="project" />
209+
<orderEntry type="library" name="Maven: com.github.jnr:jnr-x86asm:1.0.2" level="project" />
210+
<orderEntry type="library" name="Maven: com.github.jnr:jnr-constants:0.10.3" level="project" />
200211
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.19.0" level="project" />
201212
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-core:2.19.0" level="project" />
202213
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-to-slf4j:2.19.0" level="project" />
@@ -248,7 +259,7 @@
248259
<orderEntry type="library" scope="TEST" name="Maven: org.caffinitas.ohc:ohc-core-j8:0.4.4" level="project" />
249260
<orderEntry type="library" scope="TEST" name="Maven: com.github.ben-manes.caffeine:caffeine:2.2.6" level="project" />
250261
<orderEntry type="library" scope="TEST" name="Maven: org.jctools:jctools-core:1.2.1" level="project" />
251-
<orderEntry type="library" scope="TEST" name="Maven: org.ow2.asm:asm:5.0.4" level="project" />
262+
<orderEntry type="library" name="Maven: org.ow2.asm:asm:5.0.4" level="project" />
252263
<orderEntry type="library" scope="TEST" name="Maven: javax.inject:javax.inject:1" level="project" />
253264
<orderEntry type="library" name="Maven: org.hdrhistogram:HdrHistogram:2.1.9" level="project" />
254265
<orderEntry type="library" scope="TEST" name="Maven: de.jflex:jflex:1.6.0" level="project" />

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.8</version>
6+
<version>2.9</version>
77
<packaging>jar</packaging>
88

99
<properties>
@@ -72,6 +72,11 @@
7272
<artifactId>spark-cassandra-connector_${scala.main.version}</artifactId>
7373
<version>${connector.version}</version>
7474
</dependency>
75+
<dependency>
76+
<groupId>com.github.jnr</groupId>
77+
<artifactId>jnr-posix</artifactId>
78+
<version>3.1.15</version>
79+
</dependency>
7580

7681
<dependency>
7782
<groupId>org.apache.logging.log4j</groupId>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.LoggerFactory;
1212

1313
import java.time.Duration;
14+
import java.time.Instant;
1415
import java.util.Map;
1516
import java.util.stream.IntStream;
1617

@@ -84,9 +85,15 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
8485
logger.info("PARAM -- Destination Keyspace Table: {}", astraKeyspaceTable);
8586
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
8687
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
87-
logger.info("PARAM -- TTLCols: {}" + ttlCols);
88+
logger.info("PARAM -- TTLCols: {}", ttlCols);
8889
logger.info("PARAM -- WriteTimestampFilterCols: {}", writeTimeStampCols);
8990
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
91+
if (writeTimeStampFilter) {
92+
logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
93+
Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
94+
logger.info("PARAM -- maxWriteTimeStampFilter: {} datetime is {}", maxWriteTimeStampFilter,
95+
Instant.ofEpochMilli(maxWriteTimeStampFilter / 1000));
96+
}
9097

9198
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
9299
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
6161
if (filterData) {
6262
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
6363
if (col.trim().equalsIgnoreCase(filterColValue)) {
64-
logger.warn("Skipping row and filtering out: " + getKey(sourceRow));
64+
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
6565
skippedCounter.incrementAndGet();
6666
continue;
6767
}
6868
}
69+
6970
if (writeTimeStampFilter) {
7071
// only process rows greater than writeTimeStampFilter
7172
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
@@ -111,7 +112,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
111112
if (filterData) {
112113
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
113114
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
114-
logger.warn("Skipping row and filtering out: " + getKey(sourceRow));
115+
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
115116
skippedCounter.incrementAndGet();
116117
continue;
117118
}

src/main/scala/datastax/astra/migrate/AbstractJob.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ class AbstractJob extends BaseJob {
1010
abstractLogger.info("PARAM -- Split Size: " + splitSize)
1111
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
1212

13-
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
13+
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
1414
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
1515

16-
var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword,
16+
var destinationConnection = getConnection(false, destinationScbPath, destinationHost, destinationUsername, destinationPassword,
1717
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);
1818

19-
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String,
19+
private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String,
2020
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
2121
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
2222
var connType: String = "Source"
@@ -25,15 +25,15 @@ class AbstractJob extends BaseJob {
2525
}
2626

2727
var config: SparkConf = sContext.getConf
28-
if ("true".equals(isAstra)) {
28+
if (scbPath.nonEmpty) {
2929
abstractLogger.info(connType + ": Connecting to Astra using SCB: " + scbPath);
3030

3131
return CassandraConnector(config
3232
.set("spark.cassandra.auth.username", username)
3333
.set("spark.cassandra.auth.password", password)
3434
.set("spark.cassandra.input.consistency.level", consistencyLevel)
3535
.set("spark.cassandra.connection.config.cloud.path", scbPath))
36-
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
36+
} else if (trustStorePath.nonEmpty) {
3737
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) with SSL host: " + host);
3838

3939
// Use defaults when not provided

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ class BaseJob extends App {
2020

2121
val consistencyLevel = Util.getSparkPropOr(sc, "spark.read.consistency.level", "LOCAL_QUORUM")
2222

23-
val sourceIsAstra = Util.getSparkPropOr(sc, "spark.origin.isAstra", "false")
2423
val sourceScbPath = Util.getSparkPropOrEmpty(sc, "spark.origin.scb")
2524
val sourceHost = Util.getSparkPropOrEmpty(sc, "spark.origin.host")
2625
val sourceUsername = Util.getSparkPropOrEmpty(sc, "spark.origin.username")
@@ -32,7 +31,6 @@ class BaseJob extends App {
3231
val sourceKeyStorePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.keyStore.password")
3332
val sourceEnabledAlgorithms = Util.getSparkPropOrEmpty(sc, "spark.origin.enabledAlgorithms")
3433

35-
val destinationIsAstra = Util.getSparkPropOr(sc, "spark.target.isAstra", "true")
3634
val destinationScbPath = Util.getSparkPropOrEmpty(sc, "spark.target.scb")
3735
val destinationHost = Util.getSparkPropOrEmpty(sc, "spark.target.host")
3836
val destinationUsername = Util.getSparkProp(sc, "spark.target.username")

src/main/scala/datastax/astra/migrate/OriginData.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,26 @@ object OriginData extends BaseJob {
99

1010
val logger = LoggerFactory.getLogger(this.getClass.getName)
1111
logger.info("Started Migration App")
12-
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
12+
var sourceConnection = getConnection(true, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
1313
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
1414
analyzeSourceTable(sourceConnection)
1515
exitSpark
1616

1717

18-
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String,
18+
private def getConnection(isSource: Boolean, scbPath: String, host: String, username: String, password: String,
1919
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
2020
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
2121
var connType: String = "Source"
2222

23-
if ("true".equals(isAstra)) {
23+
if (scbPath.nonEmpty) {
2424
abstractLogger.info(connType + ": Connected to Astra!");
2525

2626
return CassandraConnector(sc
2727
.set("spark.cassandra.auth.username", username)
2828
.set("spark.cassandra.auth.password", password)
2929
.set("spark.cassandra.input.consistency.level", consistencyLevel)
3030
.set("spark.cassandra.connection.config.cloud.path", scbPath))
31-
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
31+
} else if (trustStorePath.nonEmpty) {
3232
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL!");
3333

3434
// Use defaults when not provided

0 commit comments

Comments
 (0)