Skip to content

Commit 4b6f938

Browse files
authored
Major refactor to make CDM Spark Native (#328)
* Made Partition into its own class & refactored stuff to make that work * Made CounterUnit its own class & refactored JobCounter to work with it. * Made JobType (Migrate, Validate & Guardrail) independent of track-run feature and renamed slices/partitions to PartitionRanges. Also provided actual jobs access to PartitionRange class. * Refactored code to be Spark Native, fixed metrics reporting & also improved trackRun feature. * Updated readme * Fixed metrics issue when trackRun was disabled.
1 parent 42a7148 commit 4b6f938

27 files changed

+298
-315
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ spark-submit --properties-file cdm.properties \
160160
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
161161
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
162162
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
163-
- The Spark Cluster based deployment currently has a bug. It reports '0' for all count metrics, while doing underlying tasks (Migration, Validation, etc.). We are working to address this in the upcoming releases. Also note that this issue is only with the Spark cluster deployment and not with the single VM run.
163+
- When running on a Spark Cluster (and not a single VM), the rate-limit values (`spark.cdm.perfops.ratelimit.origin` & `spark.cdm.perfops.ratelimit.target`) applies to individual Spark worker nodes. Hence this value should be set to `effective-rate-limit-you-need`/`number-of-spark-worker-nodes` . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500.
164164

165165
# Performance recommendations
166166
Below recommendations may only be useful when migrating large tables where the default performance is not good enough

RELEASE.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
# Release Notes
2+
## [5.0.0] - 2024-11-08
3+
- CDM refactored to be fully Spark Native and more performant when deployed on a multi-node Spark Cluster
4+
- `trackRun` feature has been expanded to record `run-info` for each part in the `CDM_RUN_DETAILS` table. Along with granular metrics, this information can be used to troubleshoot any unbalanced problematic partitions.
5+
- This release has feature parity with 4.x release and is also backword compatible while adding the above mentioned improvements. However, we are upgrading it to 5.x as its a major rewrite of the code to make it Spark native.
6+
27
## [4.7.0] - 2024-10-25
3-
- CDM refractored to work when deployed on a Spark Cluster
8+
- CDM refactored to work when deployed on a Spark Cluster
49
- More performant for large migration efforts (multi-terabytes clusters with several billions of rows) using Spark Cluster (instead of individual VMs)
510
- No functional changes and fully backward compatible, just refactor to support Spark cluster deployment
611

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,26 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
6060

6161
this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabInfo
6262
+ " (table_name TEXT, run_id BIGINT, run_type TEXT, prev_run_id BIGINT, start_time TIMESTAMP, end_time TIMESTAMP, run_info TEXT, status TEXT, PRIMARY KEY (table_name, run_id))");
63+
this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabDetails
64+
+ " (table_name TEXT, run_id BIGINT, start_time TIMESTAMP, token_min BIGINT, token_max BIGINT, status TEXT, run_info TEXT, PRIMARY KEY ((table_name, run_id), token_min))");
6365

6466
// TODO: Remove this code block after a few releases, its only added for backward compatibility
6567
try {
6668
this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT");
69+
this.session.execute("ALTER TABLE " + cdmKsTabDetails + " ADD run_info TEXT");
6770
} catch (Exception e) {
6871
// ignore if column already exists
6972
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
7073
}
71-
this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabDetails
72-
+ " (table_name TEXT, run_id BIGINT, start_time TIMESTAMP, token_min BIGINT, token_max BIGINT, status TEXT, PRIMARY KEY ((table_name, run_id), token_min))");
7374

7475
boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
7576
+ " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, dateof(now()), ?)");
7677
boundInitStatement = bindStatement("INSERT INTO " + cdmKsTabDetails
7778
+ " (table_name, run_id, token_min, token_max, status) VALUES (?, ?, ?, ?, ?)");
7879
boundEndInfoStatement = bindStatement("UPDATE " + cdmKsTabInfo
7980
+ " SET end_time = dateof(now()), run_info = ?, status = ? WHERE table_name = ? AND run_id = ?");
80-
boundUpdateStatement = bindStatement(
81-
"UPDATE " + cdmKsTabDetails + " SET status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
81+
boundUpdateStatement = bindStatement("UPDATE " + cdmKsTabDetails
82+
+ " SET status = ?, run_info = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
8283
boundUpdateStartStatement = bindStatement("UPDATE " + cdmKsTabDetails
8384
+ " SET start_time = dateof(now()), status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
8485
boundSelectInfoStatement = bindStatement(
@@ -87,7 +88,8 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
8788
+ " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING");
8889
}
8990

90-
public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws RunNotStartedException {
91+
public Collection<PartitionRange> getPendingPartitions(long prevRunId, JobType jobType)
92+
throws RunNotStartedException {
9193
if (prevRunId == 0) {
9294
return Collections.emptyList();
9395
}
@@ -105,27 +107,29 @@ public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws Ru
105107
}
106108

107109
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
108-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString()));
109-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString()));
110-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString()));
111-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.DIFF.toString()));
110+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString(), jobType));
111+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString(), jobType));
112+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString(), jobType));
113+
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.DIFF.toString(), jobType));
112114

113115
return pendingParts;
114116
}
115117

116-
protected Collection<PartitionRange> getPartitionsByStatus(long prevRunId, String status) {
117-
ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName)
118-
.setLong("run_id", prevRunId).setString("status", status));
119-
118+
protected Collection<PartitionRange> getPartitionsByStatus(long runId, String status, JobType jobType) {
120119
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
121-
rs.forEach(row -> {
120+
getResultSetByStatus(runId, status).forEach(row -> {
122121
PartitionRange part = new PartitionRange(BigInteger.valueOf(row.getLong("token_min")),
123-
BigInteger.valueOf(row.getLong("token_max")));
122+
BigInteger.valueOf(row.getLong("token_max")), jobType);
124123
pendingParts.add(part);
125124
});
126125
return pendingParts;
127126
}
128127

128+
protected ResultSet getResultSetByStatus(long runId, String status) {
129+
return session.execute(boundSelectStatement.setString("table_name", tableName).setLong("run_id", runId)
130+
.setString("status", status));
131+
}
132+
129133
public void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> parts, JobType jobType) {
130134
ResultSet rsInfo = session
131135
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId));
@@ -153,13 +157,14 @@ public void endCdmRun(long runId, String runInfo) {
153157
.setString("run_info", runInfo).setString("status", TrackRun.RUN_STATUS.ENDED.toString()));
154158
}
155159

156-
public void updateCdmRun(long runId, BigInteger min, TrackRun.RUN_STATUS status) {
160+
public void updateCdmRun(long runId, BigInteger min, TrackRun.RUN_STATUS status, String runInfo) {
157161
if (TrackRun.RUN_STATUS.STARTED.equals(status)) {
158162
session.execute(boundUpdateStartStatement.setString("table_name", tableName).setLong("run_id", runId)
159163
.setLong("token_min", min.longValue()).setString("status", status.toString()));
160164
} else {
161165
session.execute(boundUpdateStatement.setString("table_name", tableName).setLong("run_id", runId)
162-
.setLong("token_min", min.longValue()).setString("status", status.toString()));
166+
.setLong("token_min", min.longValue()).setString("status", status.toString())
167+
.setString("run_info", runInfo));
163168
}
164169
}
165170

src/main/java/com/datastax/cdm/feature/TrackRun.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public TrackRun(CqlSession session, String keyspaceTable) {
3939
this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable);
4040
}
4141

42-
public Collection<PartitionRange> getPendingPartitions(long prevRunId) throws RunNotStartedException {
43-
Collection<PartitionRange> pendingParts = runStatement.getPendingPartitions(prevRunId);
42+
public Collection<PartitionRange> getPendingPartitions(long prevRunId, JobType jobType)
43+
throws RunNotStartedException {
44+
Collection<PartitionRange> pendingParts = runStatement.getPendingPartitions(prevRunId, jobType);
4445
logger.info("###################### {} partitions pending from previous run id {} ######################",
4546
pendingParts.size(), prevRunId);
4647
return pendingParts;
@@ -51,8 +52,8 @@ public void initCdmRun(long runId, long prevRunId, Collection<PartitionRange> pa
5152
logger.info("###################### Run Id for this job is: {} ######################", runId);
5253
}
5354

54-
public void updateCdmRun(long runId, BigInteger min, RUN_STATUS status) {
55-
runStatement.updateCdmRun(runId, min, status);
55+
public void updateCdmRun(long runId, BigInteger min, RUN_STATUS status, String runInfo) {
56+
runStatement.updateCdmRun(runId, min, status, runInfo);
5657
}
5758

5859
public void endCdmRun(long runId, String runInfo) {

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
4040
protected EnhancedSession originSession;
4141
protected EnhancedSession targetSession;
4242
protected Guardrail guardrailFeature;
43-
protected JobCounter jobCounter;
4443
protected Long printStatsAfter;
4544
protected TrackRun trackRunFeature;
4645
protected long runId;
@@ -65,8 +64,6 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
6564
KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
6665
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
6766
}
68-
this.jobCounter = new JobCounter(printStatsAfter,
69-
propertyHelper.getBoolean(KnownProperties.PRINT_STATS_PER_PART));
7067

7168
rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
7269
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
@@ -127,11 +124,4 @@ public synchronized void initCdmRun(long runId, long prevRunId, Collection<Parti
127124
DataUtility.deleteGeneratedSCB(runId);
128125
}
129126

130-
public synchronized void printCounts(boolean isFinal) {
131-
if (isFinal) {
132-
jobCounter.printFinal(runId, trackRunFeature);
133-
} else {
134-
jobCounter.printProgress();
135-
}
136-
}
137127
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.job;
17+
18+
import org.apache.spark.util.AccumulatorV2;
19+
20+
import com.datastax.cdm.job.IJobSessionFactory.JobType;
21+
22+
public class CDMMetricsAccumulator extends AccumulatorV2<JobCounter, JobCounter> {
23+
24+
private static final long serialVersionUID = -4185304101452658315L;
25+
private JobCounter jobCounter;
26+
27+
public CDMMetricsAccumulator(JobType jobType) {
28+
jobCounter = new JobCounter(jobType);
29+
}
30+
31+
@Override
32+
public void add(JobCounter v) {
33+
jobCounter.add(v);
34+
}
35+
36+
@Override
37+
public AccumulatorV2<JobCounter, JobCounter> copy() {
38+
return this;
39+
}
40+
41+
@Override
42+
public boolean isZero() {
43+
return jobCounter.isZero();
44+
}
45+
46+
@Override
47+
public void merge(AccumulatorV2<JobCounter, JobCounter> other) {
48+
jobCounter.add(other.value());
49+
}
50+
51+
@Override
52+
public void reset() {
53+
jobCounter.reset();
54+
}
55+
56+
@Override
57+
public JobCounter value() {
58+
return jobCounter;
59+
}
60+
61+
}

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@ public class CopyJobSession extends AbstractJobSession<PartitionRange> {
5151

5252
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) {
5353
super(originSession, targetSession, propHelper);
54-
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE,
55-
JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR, JobCounter.CounterType.UNFLUSHED);
56-
5754
pkFactory = this.originSession.getPKFactory();
5855
isCounterTable = this.originSession.getCqlTable().isCounterTable();
5956
fetchSize = this.originSession.getCqlTable().getFetchSizeInRows();
@@ -69,10 +66,10 @@ protected void processPartitionRange(PartitionRange range) {
6966
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
7067
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
7168
if (null != trackRunFeature)
72-
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED);
69+
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED, "");
7370

7471
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
75-
jobCounter.threadReset();
72+
JobCounter jobCounter = range.getJobCounter();
7673

7774
try {
7875
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession
@@ -117,20 +114,21 @@ protected void processPartitionRange(PartitionRange range) {
117114
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
118115
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
119116
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
120-
if (null != trackRunFeature)
121-
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS);
117+
jobCounter.globalIncrement();
118+
if (null != trackRunFeature) {
119+
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getThreadCounters(true));
120+
}
122121
} catch (Exception e) {
123122
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
124123
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE)
125124
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
126-
if (null != trackRunFeature)
127-
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL);
128125
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
129126
Thread.currentThread().getId(), min, max, e);
130127
logger.error("Error stats " + jobCounter.getThreadCounters(false));
131-
} finally {
132128
jobCounter.globalIncrement();
133-
printCounts(false);
129+
if (null != trackRunFeature) {
130+
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getThreadCounters(true));
131+
}
134132
}
135133
}
136134

src/main/java/com/datastax/cdm/job/CopyJobSessionFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,4 @@ public AbstractJobSession<PartitionRange> getInstance(CqlSession originSession,
3636
return jobSession;
3737
}
3838

39-
public JobType getJobType() {
40-
return JobType.MIGRATE;
41-
}
4239
}

src/main/java/com/datastax/cdm/job/CounterUnit.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,34 @@
1616
package com.datastax.cdm.job;
1717

1818
import java.io.Serializable;
19-
import java.util.concurrent.atomic.AtomicLong;
2019

2120
public class CounterUnit implements Serializable {
2221

2322
private static final long serialVersionUID = 2194336948011681878L;
24-
private final AtomicLong globalCounter = new AtomicLong(0);
25-
private final transient ThreadLocal<Long> threadLocalCounter = ThreadLocal.withInitial(() -> 0L);
23+
private long globalCounter = 0;
24+
private long threadLocalCounter = 0;
2625

2726
public void incrementThreadCounter(long incrementBy) {
28-
threadLocalCounter.set(threadLocalCounter.get() + incrementBy);
27+
threadLocalCounter += incrementBy;
2928
}
3029

3130
public long getThreadCounter() {
32-
return threadLocalCounter.get();
31+
return threadLocalCounter;
3332
}
3433

3534
public void resetThreadCounter() {
36-
threadLocalCounter.set(0L);
35+
threadLocalCounter = 0;
3736
}
3837

3938
public void setGlobalCounter(long value) {
40-
globalCounter.set(value);
39+
globalCounter = value;
4140
}
4241

4342
public void addThreadToGlobalCounter() {
44-
globalCounter.addAndGet(threadLocalCounter.get());
43+
globalCounter += threadLocalCounter;
4544
}
4645

4746
public long getGlobalCounter() {
48-
return globalCounter.get();
47+
return globalCounter;
4948
}
5049
}

0 commit comments

Comments
 (0)