Skip to content

Commit 85ca927

Browse files
authored
Track each run and allow stop, resume, rerun functions from a previous unsuccessful job (#269)
* Implemented initial version of TrackRun feature * Added track-run feature for DiffData as well * Minor refactor * Implemented track-run feature with stop & resume function based on a previous run-id * Minor refactor * Implemented operations on cdm-run-info table. * Met test coverage requirements * Capturing run-type, validation diff status, updated some defaults & added docs * Readme changes
1 parent 4bca5cf commit 85ca927

33 files changed

+1254
-796
lines changed

README.md

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -83,50 +83,22 @@ spark.cdm.autocorrect.mismatch false|true
8383
Note:
8484
- The validation job will never delete records from target i.e. it only adds or updates data on target
8585

86-
# Migrating or Validating specific partition ranges
87-
- You can also use the tool to Migrate or Validate specific partition ranges by using a partition-file with the name `./<keyspacename>.<tablename>_partitions.csv` in the below format in the current folder as input
88-
```
89-
-507900353496146534,-107285462027022883
90-
-506781526266485690,1506166634797362039
91-
2637884402540451982,4638499294009575633
92-
798869613692279889,8699484505161403540
93-
```
94-
Each line above represents a partition-range (`min,max`). Alternatively, you can also pass the partition-file via command-line param as shown below
95-
96-
```
97-
./spark-submit --properties-file cdm.properties \
98-
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
99-
--conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
100-
--master "local[*]" --driver-memory 25G --executor-memory 25G \
101-
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
102-
```
103-
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.
86+
# Rerun (previously incomplete) Migration or Validation
87+
- You can rerun a Migration or Validation job to complete a previous run that could have stopped for any reasons. This mode will skip any token-ranges from previous run that were migrated or validated successfully. This is done by passing the `spark.cdm.trackRun.previousRunId` param as shown below
10488

105-
A file named `./<keyspacename>.<tablename>_partitions.csv` is auto-generated by the Migration & Validation jobs in the above format containing any failed partition ranges. No file is created if there are no failed partitions. This file can be used as an input to process any failed partition in a following run. You can also specify a different output file using the `spark.cdm.tokenrange.partitionFile.output` option.
10689
```
10790
./spark-submit --properties-file cdm.properties \
10891
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
109-
--conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
110-
--conf spark.cdm.tokenrange.partitionFile.output="/<path-to-file>/<csv-output-filename>" \
92+
--conf spark.cdm.trackRun.previousRunId=<prev_run_id> \
11193
--master "local[*]" --driver-memory 25G --executor-memory 25G \
11294
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
11395
```
114-
115-
For the Data-Validation step, use the conf option `-conf spark.cdm.tokenrange.partitionFile.appendOnDiff` as shown below. This allows the partition range to be outputted whenever there are differences, not just fails.
116-
```
117-
./spark-submit --properties-file cdm.properties \
118-
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
119-
--conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
120-
--conf spark.cdm.tokenrange.partitionFile.output="/<path-to-file>/<csv-output-filename>" \
121-
--conf spark.cdm.tokenrange.partitionFile.appendOnDiff=true \
122-
--master "local[*]" --driver-memory 25G --executor-memory 25G \
123-
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
124-
```
125-
126-
If `spark.cdm.tokenrange.partitionFile.input` or `spark.cdm.tokenrange.partitionFile.output` are not specified, the system will use `./<keyspacename>.<tablename>_partitions.csv` as the default file.
96+
Note:
97+
- This feature replaces and improves upon an older similar feature (using param `spark.cdm.tokenrange.partitionFile`) that is now deprecated and will be removed soon.
12798

12899
# Perform large-field Guardrail violation checks
129100
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below
101+
130102
```
131103
./spark-submit --properties-file cdm.properties \
132104
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
@@ -138,6 +110,8 @@ If `spark.cdm.tokenrange.partitionFile.input` or `spark.cdm.tokenrange.partition
138110
# Features
139111
- Auto-detects table schema (column names, types, keys, collections, UDTs, etc.)
140112
- Including counter table [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
113+
- Rerun job from where the previous job had stopped for any reason (killed, had exceptions, etc.)
114+
- If you rerun a `validation` job, it will include any token-ranges that had differences in the previous run
141115
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
142116
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
143117
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or a list of token-ranges
@@ -150,14 +124,18 @@ If `spark.cdm.tokenrange.partitionFile.input` or `spark.cdm.tokenrange.partition
150124
- Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra)
151125
- Validate migration accuracy and performance using a smaller randomized data-set
152126
- Supports adding custom fixed `writetime`
127+
- Track run information (start-time, end-time, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace
153128
- Validation - Log partitions range level exceptions, use the exceptions file as input for rerun
154129

155130
# Things to know
131+
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace.
156132
- CDM does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row.
157133
- CDM ignores `ttl` & `writetime` on collection and UDT fields while computing the highest value
158134
- If a table has only collection and/or UDT non-key columns and not table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry.
159135
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. Alternatively if needed, the param `spark.cdm.transform.custom.writetime` can be used to set a static custom value for `writetime`.
160-
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
136+
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
137+
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
138+
161139

162140
# Building Jar for local development
163141
1. Clone this repo

RELEASE.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
# Release Notes
2+
## [4.3.0] - 2024-07-18
3+
- Added `spark.cdm.trackRun` feature to support stop and resume function for Migration and Validation jobs
4+
- Validation jobs ran with `auto-correct` feature disabled, can now be rerun with `auto-correct` feature enabled in a much optimal way to only correct the token-ranges with validation errors during the rerun
5+
- Records summary and details of each run in tables (`cdm_run_info` and `cdm_run_details`) on `target` keyspace
6+
27
## [4.2.0] - 2024-07-09
38
- Upgraded `constant-column` feature to support `replace` and `remove` of constant columns
49
- Fixed `constant-column` feature to support any data-types within the PK columns

src/main/java/com/datastax/cdm/cql/EnhancedSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private CqlSession initSession(PropertyHelper propertyHelper, CqlSession session
9898
GenericType<?> javaType = codec.getJavaType();
9999
if (logDebug) logger.debug("Registering Codec {} for CQL type {} and Java type {}", codec.getClass().getSimpleName(), dataType, javaType);
100100
try {
101-
TypeCodec<?> existingCodec = registry.codecFor(dataType, javaType);
101+
registry.codecFor(dataType, javaType);
102102
} catch (CodecNotFoundException e) {
103103
registry.register(codec);
104104
}

src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPKStatement.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@
2323
import com.datastax.oss.driver.api.core.cql.BoundStatement;
2424
import com.datastax.oss.driver.api.core.cql.ResultSet;
2525
import com.datastax.oss.driver.api.core.cql.Row;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
2826

2927
public class OriginSelectByPKStatement extends OriginSelectStatement {
30-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
31-
3228
public OriginSelectByPKStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
3329
super(propertyHelper, session);
3430
}

src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatement.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,10 @@
2323
import com.datastax.cdm.properties.PropertyHelper;
2424
import com.datastax.oss.driver.api.core.cql.BoundStatement;
2525
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
2826

2927
import java.math.BigInteger;
3028

3129
public class OriginSelectByPartitionRangeStatement extends OriginSelectStatement {
32-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
33-
3430
public OriginSelectByPartitionRangeStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
3531
super(propertyHelper, session);
3632
}

src/main/java/com/datastax/cdm/cql/statement/TargetSelectByPKStatement.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,10 @@
2626
import com.datastax.oss.driver.api.core.cql.BoundStatement;
2727
import com.datastax.oss.driver.api.core.cql.ResultSet;
2828
import com.datastax.oss.driver.api.core.cql.Row;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3129

3230
import java.util.concurrent.CompletionStage;
3331

3432
public class TargetSelectByPKStatement extends BaseCdmStatement {
35-
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
36-
3733
public TargetSelectByPKStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
3834
super(propertyHelper, session);
3935
this.statement = buildStatement();
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.cdm.cql.statement;
17+
18+
import java.math.BigInteger;
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
23+
import com.datastax.cdm.feature.TrackRun;
24+
import com.datastax.cdm.feature.TrackRun.RUN_TYPE;
25+
import com.datastax.cdm.job.SplitPartitions;
26+
import com.datastax.cdm.job.SplitPartitions.Partition;
27+
import com.datastax.oss.driver.api.core.CqlSession;
28+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
29+
import com.datastax.oss.driver.api.core.cql.ResultSet;
30+
31+
public class TargetUpsertRunDetailsStatement {
32+
private CqlSession session;
33+
private String keyspaceName;
34+
private String tableName;
35+
private long runId;
36+
private long prevRunId;
37+
private BoundStatement boundInitInfoStatement;
38+
private BoundStatement boundInitStatement;
39+
private BoundStatement boundUpdateInfoStatement;
40+
private BoundStatement boundUpdateStatement;
41+
private BoundStatement boundUpdateStartStatement;
42+
private BoundStatement boundSelectStatement;
43+
44+
public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) {
45+
this.session = session;
46+
String[] ksTab = keyspaceTable.split("\\.");
47+
this.keyspaceName = ksTab[0];
48+
this.tableName = ksTab[1];
49+
String cdmKsTabInfo = this.keyspaceName + ".cdm_run_info";
50+
String cdmKsTabDetails = this.keyspaceName + ".cdm_run_details";
51+
52+
this.session.execute("create table if not exists " + cdmKsTabInfo
53+
+ " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, primary key (table_name, run_id))");
54+
this.session.execute("create table if not exists " + cdmKsTabDetails
55+
+ " (table_name text, run_id bigint, start_time timestamp, token_min bigint, token_max bigint, status text, primary key ((table_name, run_id), token_min))");
56+
57+
boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
58+
+ " (table_name, run_id, run_type, prev_run_id, start_time) VALUES (?, ?, ?, ?, dateof(now()))");
59+
boundInitStatement = bindStatement("INSERT INTO " + cdmKsTabDetails
60+
+ " (table_name, run_id, token_min, token_max, status) VALUES (?, ?, ?, ?, ?)");
61+
boundUpdateInfoStatement = bindStatement("UPDATE " + cdmKsTabInfo
62+
+ " SET end_time = dateof(now()), run_info = ? WHERE table_name = ? AND run_id = ?");
63+
boundUpdateStatement = bindStatement(
64+
"UPDATE " + cdmKsTabDetails + " SET status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
65+
boundUpdateStartStatement = bindStatement("UPDATE " + cdmKsTabDetails
66+
+ " SET start_time = dateof(now()), status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
67+
boundSelectStatement = bindStatement("SELECT token_min, token_max FROM " + cdmKsTabDetails
68+
+ " WHERE table_name = ? AND run_id = ? and status in ('NOT_STARTED', 'STARTED', 'FAIL', 'DIFF') ALLOW FILTERING");
69+
}
70+
71+
public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) {
72+
this.prevRunId = prevRunId;
73+
if (prevRunId == 0) {
74+
return new ArrayList<SplitPartitions.Partition>();
75+
}
76+
77+
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
78+
ResultSet rs = session
79+
.execute(boundSelectStatement.setString("table_name", tableName).setLong("run_id", prevRunId));
80+
rs.forEach(row -> {
81+
Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")),
82+
BigInteger.valueOf(row.getLong("token_max")));
83+
pendingParts.add(part);
84+
});
85+
86+
return pendingParts;
87+
}
88+
89+
public long initCdmRun(Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
90+
runId = System.currentTimeMillis();
91+
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
92+
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId));
93+
parts.forEach(part -> initCdmRun(part));
94+
return runId;
95+
}
96+
97+
private void initCdmRun(Partition partition) {
98+
session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId)
99+
.setLong("token_min", partition.getMin().longValue())
100+
.setLong("token_max", partition.getMax().longValue())
101+
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
102+
}
103+
104+
public void updateCdmRunInfo(String runInfo) {
105+
session.execute(boundUpdateInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
106+
.setString("run_info", runInfo));
107+
}
108+
109+
public void updateCdmRun(BigInteger min, TrackRun.RUN_STATUS status) {
110+
if (TrackRun.RUN_STATUS.STARTED.equals(status)) {
111+
session.execute(boundUpdateStartStatement.setString("table_name", tableName).setLong("run_id", runId)
112+
.setLong("token_min", min.longValue()).setString("status", status.toString()));
113+
} else {
114+
session.execute(boundUpdateStatement.setString("table_name", tableName).setLong("run_id", runId)
115+
.setLong("token_min", min.longValue()).setString("status", status.toString()));
116+
}
117+
}
118+
119+
private BoundStatement bindStatement(String stmt) {
120+
if (null == session)
121+
throw new RuntimeException("Session is not set");
122+
return session.prepare(stmt).bind().setTimeout(Duration.ofSeconds(10));
123+
}
124+
125+
}

src/main/java/com/datastax/cdm/data/PKFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.datastax.cdm.schema.CqlTable;
2020
import com.datastax.oss.driver.api.core.cql.BoundStatement;
2121
import com.datastax.oss.driver.api.core.cql.Row;
22-
import com.datastax.cdm.properties.KnownProperties;
2322
import com.datastax.cdm.properties.PropertyHelper;
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;

src/main/java/com/datastax/cdm/data/Record.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public enum Diff {
3232
private Row originRow;
3333
private Row targetRow;
3434
private CompletionStage<AsyncResultSet> targetFutureRow;
35-
private Diff diff = Diff.UNKNOWN;
3635

3736
public Record(EnhancedPK pk, Row originRow, Row targetRow, CompletionStage<AsyncResultSet> targetFutureRow) {
3837
if (null == pk || (null == originRow && null == targetRow && null == targetFutureRow)) {

0 commit comments

Comments
 (0)