Skip to content

Commit 68c1587

Browse files
Merge branch 'main' into feature/spark313_upgrade
2 parents 4372025 + 7a524a7 commit 68c1587

14 files changed

+147
-69
lines changed

.github/workflows/docker-publish.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ on:
1111
- 'Dockerfile'
1212
- 'LICENSE.md'
1313
tags:
14-
- 'v*.*.*'
15-
- 'v*.*.*-*'
14+
- '*.*.*'
15+
- '*.*.*-*'
1616

1717
jobs:
1818
build_and_publish:
@@ -36,15 +36,15 @@ jobs:
3636
type=semver,pattern={{major}}.{{minor}}.x
3737
type=semver,pattern={{major}}.x
3838
- name: Set up Docker Buildx
39-
uses: docker/setup-buildx-action@v1
39+
uses: docker/setup-buildx-action@v2
4040
- name: Login to DockerHub
41-
uses: docker/login-action@v1
41+
uses: docker/login-action@v2
4242
with:
4343
username: ${{ secrets.DOCKER_HUB_USERNAME }}
4444
password: ${{ secrets.DOCKER_HUB_PASSWORD }}
4545
- name: Build and push
4646
id: docker_build
47-
uses: docker/build-push-action@v2
47+
uses: docker/build-push-action@v3
4848
with:
4949
file: Dockerfile
5050
context: .

.github/workflows/docker-push-sha-commit.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ jobs:
2222
tags: |
2323
type=sha
2424
- name: Set up Docker Buildx
25-
uses: docker/setup-buildx-action@v1
25+
uses: docker/setup-buildx-action@v2
2626
- name: Login to DockerHub
27-
uses: docker/login-action@v1
27+
uses: docker/login-action@v2
2828
with:
2929
username: ${{ secrets.DOCKER_HUB_USERNAME }}
3030
password: ${{ secrets.DOCKER_HUB_PASSWORD }}
3131
- name: Build and push
3232
id: docker_build
33-
uses: docker/build-push-action@v2
33+
uses: docker/build-push-action@v3
3434
with:
3535
file: Dockerfile
3636
context: .

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RUN mkdir -p /assets/ && cd /assets && \
1313
tar -xzf ./spark-2.4.8-bin-hadoop2.7.tgz && \
1414
rm ./spark-2.4.8-bin-hadoop2.7.tgz
1515

16-
RUN apt-get update && apt-get install -y openssh-server vim --no-install-recommends && \
16+
RUN apt-get update && apt-get install -y openssh-server vim python3 --no-install-recommends && \
1717
rm -rf /var/lib/apt/lists/* && \
1818
service ssh start
1919

README.md

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,28 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
44

55
> :warning: Please note this job has been tested with spark version [2.4.8](https://archive.apache.org/dist/spark/spark-2.4.8/)
66
7-
## Build
8-
1. Clone this repo
9-
2. Move to the repo folder `cd cassandra-data-migrator`
10-
3. Run the build `mvn clean package`
11-
4. The fat jar (`cassandra-data-migrator-2.x.jar`) file should now be present in the `target` folder
7+
## Container Image
8+
- Get the latest image that includes all dependencies from [DockerHub](https://hub.docker.com/r/datastax/cassandra-data-migrator)
9+
- If you use this route, all migration tools (`cassandra-data-migrator` + `dsbulk` + `cqlsh`) would be available in the `/assets/` folder of the container
10+
- OR follow the below build steps (and Prerequisite) to build the jar locally
1211

13-
## Prerequisite
12+
### Prerequisite
1413

15-
Install Java8 as spark binaries are compiled with it.
16-
Install single instance of spark on a node where you want to run this job. Spark can be installed by running the following: -
14+
- Install Java8 as spark binaries are compiled with it.
15+
- Install Maven 3.8.x
16+
- Install single instance of spark on a node where you want to run this job. Spark can be installed by running the following: -
1717

1818
```
1919
wget https://downloads.apache.org/spark/spark-2.4.8/
2020
tar -xvzf <spark downloaded file name>
2121
```
2222

23+
### Build
24+
1. Clone this repo
25+
2. Move to the repo folder `cd cassandra-data-migrator`
26+
3. Run the build `mvn clean package`
27+
4. The fat jar (`cassandra-data-migrator-2.x.x.jar`) file should now be present in the `target` folder
28+
2329
# Steps for Data-Migration:
2430

2531
1. `sparkConf.properties` file needs to be configured as applicable for the environment
@@ -30,7 +36,7 @@ tar -xvzf <spark downloaded file name>
3036
```
3137
./spark-submit --properties-file sparkConf.properties /
3238
--master "local[*]" /
33-
--class datastax.astra.migrate.Migrate cassandra-data-migrator-2.x.jar &> logfile_name.txt
39+
--class datastax.astra.migrate.Migrate cassandra-data-migrator-2.x.x.jar &> logfile_name.txt
3440
```
3541

3642
Note: Above command also generates a log file `logfile_name.txt` to avoid log output on the console.
@@ -43,7 +49,7 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
4349
```
4450
./spark-submit --properties-file sparkConf.properties /
4551
--master "local[*]" /
46-
--class datastax.astra.migrate.DiffData cassandra-data-migrator-2.x.jar &> logfile_name.txt
52+
--class datastax.astra.migrate.DiffData cassandra-data-migrator-2.x.x.jar &> logfile_name.txt
4753
```
4854

4955
- Validation job will report differences as “ERRORS” in the log file as shown below
@@ -72,7 +78,7 @@ spark.target.autocorrect.mismatch true|false
7278
```
7379
./spark-submit --properties-file sparkConf.properties /
7480
--master "local[*]" /
75-
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-2.x.jar &> logfile_name.txt
81+
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-2.x.x.jar &> logfile_name.txt
7682
```
7783

7884
When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder in the below format, where each line (`min,max`) represents a partition-range
@@ -88,7 +94,8 @@ This mode is specifically useful to processes a subset of partition-ranges that
8894
- [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
8995
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTL](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
9096
- Advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
91-
- Filter records from origin using writetime
97+
- Filter records from origin using writetimes, CQL conditions, token-ranges
98+
- Fully containerized (Docker and K8s friendly)
9299
- SSL Support (including custom cipher algorithms)
93100
- Migrate from any Cassandra origin ([Apache Cassandra](https://cassandra.apache.org) / [DataStax Enterprise](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB](https://www.datastax.com/products/datastax-astra)) to any Cassandra target ([Apache Cassandra](https://cassandra.apache.org) / [DataStax Enterprise](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB](https://www.datastax.com/products/datastax-astra))
94101
- Validate migration accuracy and performance using a smaller randomized data-set

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.10</version>
6+
<version>2.11.0</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
2424
}
2525

2626
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
27+
super(sc);
2728
this.sourceSession = sourceSession;
2829
this.astraSession = astraSession;
2930

@@ -79,6 +80,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
7980
customWritetime = Long.parseLong(customWriteTimeStr);
8081
}
8182

83+
logger.info("PARAM -- Read Consistency: {}", readConsistencyLevel);
84+
logger.info("PARAM -- Write Consistency: {}", writeConsistencyLevel);
8285
logger.info("PARAM -- Write Batch Size: {}", batchSize);
8386
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
8487
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
@@ -98,6 +101,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
98101
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
99102
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
100103
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
104+
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
105+
sourceSelectCondition = " AND " + sourceSelectCondition;
106+
}
101107

102108
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
103109
String[] allCols = selectCols.split(",");
@@ -172,7 +178,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
172178
}
173179

174180
public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
175-
BoundStatement boundInsertStatement = insertStatement.bind();
181+
BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);
176182

177183
if (isCounterTable) {
178184
for (int index = 0; index < selectColTypes.size(); index++) {
@@ -232,7 +238,7 @@ public long getLargestWriteTimeStamp(Row sourceRow) {
232238
}
233239

234240
public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
235-
BoundStatement boundSelectStatement = selectStatement.bind();
241+
BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
236242
for (int index = 0; index < idColTypes.size(); index++) {
237243
MigrateDataType dataType = idColTypes.get(index);
238244
boundSelectStatement = boundSelectStatement.set(index, getData(dataType, index, sourceRow),

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package datastax.astra.migrate;
22

3+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
34
import com.datastax.oss.driver.api.core.CqlSession;
45
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
56
import com.datastax.oss.driver.api.core.cql.Row;
67
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import org.apache.spark.SparkConf;
79

810
import java.util.ArrayList;
911
import java.util.List;
@@ -15,6 +17,8 @@ public abstract class BaseJobSession {
1517
protected PreparedStatement sourceSelectStatement;
1618
protected PreparedStatement astraSelectStatement;
1719
protected PreparedStatement astraInsertStatement;
20+
protected ConsistencyLevel readConsistencyLevel;
21+
protected ConsistencyLevel writeConsistencyLevel;
1822

1923
// Read/Write Rate limiter
2024
// Determine the total throughput for the entire cluster in terms of wries/sec,
@@ -55,6 +59,11 @@ public abstract class BaseJobSession {
5559
protected Integer filterColIndex;
5660
protected String filterColValue;
5761

62+
protected BaseJobSession(SparkConf sc) {
63+
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
64+
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
65+
}
66+
5867
public String getKey(Row sourceRow) {
5968
StringBuffer key = new StringBuffer();
6069
for (int index = 0; index < idColTypes.size(); index++) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
4747
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
4848

4949
try {
50-
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(),
51-
hasRandomPartitioner ? max : max.longValueExact()).setPageSize(fetchSizeInRows));
50+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
51+
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
52+
.setConsistencyLevel(readConsistencyLevel).setPageSize(fetchSizeInRows));
53+
5254
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();
5355

5456
// cannot do batching if the writeFilter is greater than 0 or

src/main/java/datastax/astra/migrate/CopyPKJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void getRowAndInsert(List<SplitPartitions.PKRows> rowsList) {
4343
readCounter.incrementAndGet();
4444
String[] pkFields = row.split(" %% ");
4545
int idx = 0;
46-
BoundStatement bspk = sourceSelectStatement.bind();
46+
BoundStatement bspk = sourceSelectStatement.bind().setConsistencyLevel(readConsistencyLevel);
4747
for (MigrateDataType tp : idColTypes) {
4848
bspk = bspk.set(idx, convert(tp.typeClass, pkFields[idx]), tp.typeClass);
4949
idx++;

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package datastax.astra.migrate;
22

3-
import com.datastax.oss.driver.api.core.ConsistencyLevel;
43
import com.datastax.oss.driver.api.core.CqlSession;
54
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
65
import com.datastax.oss.driver.api.core.cql.ResultSet;
@@ -61,9 +60,9 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
6160

6261
try {
6362
// cannot do batching if the writeFilter is greater than 0
64-
ResultSet resultSet = sourceSession.execute(
65-
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
66-
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setPageSize(fetchSizeInRows));
63+
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
64+
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
65+
.setConsistencyLevel(readConsistencyLevel).setPageSize(fetchSizeInRows));
6766

6867
Map<Row, CompletionStage<AsyncResultSet>> srcToTargetRowMap = new HashMap<Row, CompletionStage<AsyncResultSet>>();
6968
StreamSupport.stream(resultSet.spliterator(), false).forEach(srcRow -> {

0 commit comments

Comments
 (0)