Skip to content

Commit e9e9c89

Browse files
committed
Pulled in the latest changes
Addressed merge conflicts Updated docs Removed stale script
2 parents d98e951 + bf40068 commit e9e9c89

File tree

8 files changed

+60
-472
lines changed

8 files changed

+60
-472
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ RUN apt-get update && apt-get install -y openssh-server vim python3 --no-install
1818
service ssh start
1919

2020
# Copy CDM jar & template files
21-
ARG MAVEN_VERSION=3.8.6
21+
ARG MAVEN_VERSION=3.8.7
2222
ARG USER_HOME_DIR="/root"
23-
ARG BASE_URL=https://apache.osuosl.org/maven/maven-3/${MAVEN_VERSION}/binaries
23+
ARG BASE_URL=https://dlcdn.apache.org/maven/maven-3/${MAVEN_VERSION}/binaries
2424
ENV MAVEN_HOME /usr/share/maven
2525
ENV MAVEN_CONFIG "$USER_HOME_DIR/.m2"
2626
COPY ./src /assets/src

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Migrate and Validate Tables between Origin and Target Cassandra Clusters.
44

5-
> :warning: Please note this job has been tested with spark version [2.4.8](https://archive.apache.org/dist/spark/spark-2.4.8/)
5+
> :warning: Please note this job has been tested with spark version [3.3.1](https://archive.apache.org/dist/spark/spark-3.3.1/)
66
77
## Container Image
88
- Get the latest image that includes all dependencies from [DockerHub](https://hub.docker.com/r/datastax/cassandra-data-migrator)
@@ -16,15 +16,15 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
1616
- Install single instance of spark on a node where you want to run this job. Spark can be installed by running the following: -
1717

1818
```
19-
wget https://downloads.apache.org/spark/spark-2.4.8/
19+
wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
2020
tar -xvzf <spark downloaded file name>
2121
```
2222

2323
### Build
2424
1. Clone this repo
2525
2. Move to the repo folder `cd cassandra-data-migrator`
2626
3. Run the build `mvn clean package`
27-
4. The fat jar (`cassandra-data-migrator-2.x.x.jar`) file should now be present in the `target` folder
27+
4. The fat jar (`cassandra-data-migrator-3.x.x.jar`) file should now be present in the `target` folder
2828

2929
# Steps for Data-Migration:
3030

@@ -36,7 +36,7 @@ tar -xvzf <spark downloaded file name>
3636
```
3737
./spark-submit --properties-file sparkConf.properties /
3838
--master "local[*]" /
39-
--class datastax.astra.migrate.Migrate cassandra-data-migrator-2.x.x.jar &> logfile_name.txt
39+
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
4040
```
4141

4242
Note: Above command also generates a log file `logfile_name.txt` to avoid log output on the console.
@@ -49,7 +49,7 @@ Note: Above command also generates a log file `logfile_name.txt` to avoid log ou
4949
```
5050
./spark-submit --properties-file sparkConf.properties /
5151
--master "local[*]" /
52-
--class datastax.astra.migrate.DiffData cassandra-data-migrator-2.x.x.jar &> logfile_name.txt
52+
--class datastax.astra.migrate.DiffData cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
5353
```
5454

5555
- Validation job will report differences as “ERRORS” in the log file as shown below
@@ -78,7 +78,7 @@ spark.target.autocorrect.mismatch true|false
7878
```
7979
./spark-submit --properties-file sparkConf.properties /
8080
--master "local[*]" /
81-
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-2.x.x.jar &> logfile_name.txt
81+
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
8282
```
8383

8484
When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder in the below format, where each line (`min,max`) represents a partition-range

pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
<plugin>
134134
<groupId>net.alchim31.maven</groupId>
135135
<artifactId>scala-maven-plugin</artifactId>
136-
<version>3.2.2</version>
136+
<version>4.8.0</version>
137137
<executions>
138138
<execution>
139139
<phase>process-sources</phase>
@@ -148,7 +148,7 @@
148148
<plugin>
149149
<groupId>org.apache.maven.plugins</groupId>
150150
<artifactId>maven-shade-plugin</artifactId>
151-
<version>2.4.3</version>
151+
<version>3.4.1</version>
152152
<executions>
153153
<execution>
154154

@@ -178,7 +178,7 @@
178178
<plugin>
179179
<groupId>org.apache.maven.plugins</groupId>
180180
<artifactId>maven-surefire-plugin</artifactId>
181-
<version>2.7</version>
181+
<version>2.22.2</version>
182182
<configuration>
183183
<skipTests>true</skipTests>
184184
</configuration>
@@ -187,7 +187,7 @@
187187
<plugin>
188188
<groupId>org.scalatest</groupId>
189189
<artifactId>scalatest-maven-plugin</artifactId>
190-
<version>1.0</version>
190+
<version>2.2.0</version>
191191
<configuration>
192192
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
193193
<junitxml>.</junitxml>
@@ -206,7 +206,7 @@
206206
<plugin>
207207
<groupId>org.apache.maven.plugins</groupId>
208208
<artifactId>maven-compiler-plugin</artifactId>
209-
<version>3.8.1</version>
209+
<version>3.10.1</version>
210210
<configuration>
211211
<source>1.8</source>
212212
<target>1.8</target>

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class CopyJobSession extends AbstractJobSession {
1919
protected AtomicLong readCounter = new AtomicLong(0);
2020
protected AtomicLong skippedCounter = new AtomicLong(0);
2121
protected AtomicLong writeCounter = new AtomicLong(0);
22+
protected AtomicLong errorCounter = new AtomicLong(0);
2223

2324
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2425
super(sourceSession, astraSession, sc);
@@ -44,8 +45,13 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
4445
public void getDataAndInsert(BigInteger min, BigInteger max) {
4546
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
4647
int maxAttempts = maxRetries;
47-
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
48+
boolean done = false;
4849

50+
for (int retryCount = 1; retryCount <= maxAttempts && !done; retryCount++) {
51+
long readCnt = 0;
52+
long writeCnt = 0;
53+
long skipCnt = 0;
54+
long errCnt = 0;
4955
try {
5056
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
5157
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
@@ -59,67 +65,66 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
5965
if (batchSize == 1 || writeTimeStampFilter || isCounterTable) {
6066
for (Row sourceRow : resultSet) {
6167
readLimiter.acquire(1);
68+
readCnt++;
69+
if (readCnt % printStatsAfter == 0) {
70+
printCounts(false);
71+
}
6272

6373
if (filterData) {
6474
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
6575
if (col.trim().equalsIgnoreCase(filterColValue)) {
6676
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
67-
skippedCounter.incrementAndGet();
77+
skipCnt++;
6878
continue;
6979
}
7080
}
71-
7281
if (writeTimeStampFilter) {
7382
// only process rows greater than writeTimeStampFilter
7483
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
7584
if (sourceWriteTimeStamp < minWriteTimeStampFilter
7685
|| sourceWriteTimeStamp > maxWriteTimeStampFilter) {
77-
readCounter.incrementAndGet();
78-
skippedCounter.incrementAndGet();
86+
skipCnt++;
7987
continue;
8088
}
8189
}
82-
8390
writeLimiter.acquire(1);
84-
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
85-
printCounts(false);
86-
}
91+
8792
Row astraRow = null;
8893
if (isCounterTable) {
8994
ResultSet astraReadResultSet = astraSession
9095
.execute(selectFromAstra(astraSelectStatement, sourceRow));
9196
astraRow = astraReadResultSet.one();
9297
}
9398

94-
9599
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
96100
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
97101
writeResults.add(astraWriteResultSet);
98102
if (writeResults.size() > fetchSizeInRows) {
99-
iterateAndClearWriteResults(writeResults, 1);
103+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
100104
}
101105
}
102106

103107
// clear the write resultset
104-
iterateAndClearWriteResults(writeResults, 1);
108+
writeCnt += iterateAndClearWriteResults(writeResults, 1);
105109
} else {
106110
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
107111
for (Row sourceRow : resultSet) {
108112
readLimiter.acquire(1);
109-
writeLimiter.acquire(1);
110-
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
113+
readCnt++;
114+
if (readCnt % printStatsAfter == 0) {
111115
printCounts(false);
112116
}
113117

114118
if (filterData) {
115119
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
116120
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
117121
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow));
118-
skippedCounter.incrementAndGet();
122+
skipCnt++;
119123
continue;
120124
}
121125
}
122126

127+
writeLimiter.acquire(1);
123128
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
124129

125130
// if batch threshold is met, send the writes and clear the batch
@@ -130,27 +135,37 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
130135
}
131136

132137
if (writeResults.size() * batchSize > fetchSizeInRows) {
133-
iterateAndClearWriteResults(writeResults, batchSize);
138+
writeCnt += iterateAndClearWriteResults(writeResults, batchSize);
134139
}
135140
}
136141

137142
// clear the write resultset
138-
iterateAndClearWriteResults(writeResults, batchSize);
143+
writeCnt += iterateAndClearWriteResults(writeResults, batchSize);
139144

140145
// if there are any pending writes because the batchSize threshold was not met, then write and clear them
141146
if (batchStatement.size() > 0) {
142147
CompletionStage<AsyncResultSet> writeResultSet = astraSession.executeAsync(batchStatement);
143148
writeResults.add(writeResultSet);
144-
iterateAndClearWriteResults(writeResults, batchStatement.size());
149+
writeCnt += iterateAndClearWriteResults(writeResults, batchStatement.size());
145150
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
146151
}
147152
}
148153

149-
retryCount = maxAttempts;
154+
readCounter.addAndGet(readCnt);
155+
writeCounter.addAndGet(writeCnt);
156+
skippedCounter.addAndGet(skipCnt);
157+
done = true;
150158
} catch (Exception e) {
159+
if (retryCount == maxAttempts) {
160+
readCounter.addAndGet(readCnt);
161+
writeCounter.addAndGet(writeCnt);
162+
skippedCounter.addAndGet(skipCnt);
163+
errorCounter.addAndGet(readCnt - writeCnt - skipCnt);
164+
}
151165
logger.error("Error occurred retry#: {}", retryCount, e);
152166
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Retry# {}",
153167
Thread.currentThread().getId(), min, max, retryCount);
168+
logger.error("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}", readCnt, writeCnt, skipCnt, (readCnt - writeCnt - skipCnt));
154169
}
155170
}
156171
}
@@ -164,18 +179,22 @@ public synchronized void printCounts(boolean isFinal) {
164179
logger.info("{} Read Record Count: {}", msg, readCounter.get());
165180
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
166181
logger.info("{} Write Record Count: {}", msg, writeCounter.get());
182+
logger.info("{} Error Record Count: {}", msg, errorCounter.get());
167183
if (isFinal) {
168184
logger.info("################################################################################################");
169185
}
170186
}
171187

172-
private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
188+
private int iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultSet>> writeResults, int incrementBy) throws Exception {
189+
int cnt = 0;
173190
for (CompletionStage<AsyncResultSet> writeResult : writeResults) {
174191
//wait for the writes to complete for the batch. The Retry policy, if defined, should retry the write on timeouts.
175192
writeResult.toCompletableFuture().get().one();
176-
writeCounter.addAndGet(incrementBy);
193+
cnt += incrementBy;
177194
}
178195
writeResults.clear();
196+
197+
return cnt;
179198
}
180199

181200
}

0 commit comments

Comments
 (0)