Skip to content

Commit 18f04e6

Browse files
committed
CDM-87: Refactored exception handling and loading of token-range filters to use the same Migrate & DiffData jobs instead of separate jobs to reduce code & maintenance overhead
1 parent 6c8cbd9 commit 18f04e6

16 files changed

+92
-135
lines changed

README.md

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -82,40 +82,27 @@ spark.cdm.autocorrect.mismatch false|true
8282
Note:
8383
- The validation job will never delete records from target i.e. it only adds or updates data on target
8484

85-
# Migrating specific partition ranges
86-
- You can also use the tool to migrate specific partition ranges using class option `--class com.datastax.cdm.job.MigratePartitionsFromFile` as shown below
87-
```
88-
./spark-submit --properties-file cdm.properties /
89-
--conf spark.cdm.schema.origin.keyspaceTable="<keyspace-name>.<table-name>" /
90-
--master "local[*]" /
91-
--class com.datastax.cdm.job.MigratePartitionsFromFile cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
92-
```
93-
94-
When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder in the below format, where each line (`min,max`) represents a partition-range
85+
# Migrating or Validating specific partition ranges
86+
- You can also use the tool to Migrate or Validate specific partition ranges by using a partition-file with the name `./<keyspace>.<tablename>_partitions.csv` in the below format in the current folder as input
9587
```
9688
-507900353496146534,-107285462027022883
9789
-506781526266485690,1506166634797362039
9890
2637884402540451982,4638499294009575633
9991
798869613692279889,8699484505161403540
10092
```
101-
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.
102-
103-
> **Note:**
104-
> A file ending with `*_partitions.csv` will be auto created by the Migration & Validation job in the above format containing any failed partition ranges. Just rename it as below & run the above job.
93+
Each line above represents a partition-range (`min,max`). Alternatively, you can also pass the partition-file via command-line param as shown below
10594

10695
```
107-
mv <keyspace>.<table>_partitions.csv partitions.csv
108-
```
109-
# Data validation for specific partition ranges
110-
- You can also use the tool to validate data for a specific partition ranges using class option `--class com.datastax.cdm.job.DiffPartitionsFromFile` as shown below,
111-
```
112-
./spark-submit --properties-file cdm.properties /
113-
--conf spark.cdm.schema.origin.keyspaceTable="<keyspace-name>.<table-name>" /
114-
--master "local[*]" /
115-
--class com.datastax.cdm.job.DiffPartitionsFromFile cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
96+
spark-submit --properties-file cdm.properties /
97+
--conf spark.cdm.schema.origin.keyspaceTable="test_ks.cat_promo" /
98+
--conf spark.tokenRange.partitionFile="/<path-to-file>.<csv-input-filename>" /
99+
--master "local[*]" /
100+
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
116101
```
102+
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.
117103

118-
When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder.
104+
> **Note:**
105+
> A file named `./<keyspace>.<tablename>_partitions.csv` will be auto created by the Migration & Validation job in the above format containing any failed partition ranges. You can use this file as an input to process any failed partition in a following run.
119106
120107
# Perform large-field Guardrail violation checks
121108
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below
@@ -132,7 +119,7 @@ When running in above mode the tool assumes a `partitions.csv` file to be presen
132119
- Including counter table [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
133120
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
134121
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
135-
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or min/max token-range
122+
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or a list of token-ranges
136123
- Perform guardrail checks (identify large fields)
137124
- Supports adding `constants` as new columns on `Target`
138125
- Supports expanding `Map` columns on `Origin` into multiple records on `Target`

RELEASE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Release Notes
2+
## [4.0.2] - 2023-06-20
3+
- Refactored exception handling and loading of token-range filters to use the same Migrate & DiffData jobs instead of separate jobs to reduce code & maintenance overhead
4+
25
## [4.0.2] - 2023-06-16
36
- Capture failed partitions in a file for easier reruns
47
- Optimized mvn to reduce jar size

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
1919
protected EnhancedSession targetSession;
2020
protected Guardrail guardrailFeature;
2121
protected boolean guardrailEnabled;
22-
protected String tokenRangeExceptionDir;
22+
protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper);
23+
2324
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
2425
this(originSession, targetSession, sc, false);
2526
}
27+
2628
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
2729
super(sc);
2830

@@ -41,10 +43,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4143
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
4244
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
4345

44-
tokenRangeExceptionDir = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_DIR);
45-
4646
logger.info("PARAM -- Max Retries: {}", maxRetries);
47-
logger.info("PARAM -- Token range exception dir: {}", tokenRangeExceptionDir);
47+
logger.info("PARAM -- Partition file: {}", partitionFile);
4848
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
4949
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
5050

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.datastax.cdm.feature.Feature;
44
import com.datastax.cdm.feature.FeatureFactory;
55
import com.datastax.cdm.feature.Featureset;
6+
import com.datastax.cdm.properties.KnownProperties;
67
import com.datastax.cdm.properties.PropertyHelper;
78
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
89
import org.apache.commons.lang3.StringUtils;
@@ -25,14 +26,13 @@ public abstract class BaseJobSession {
2526

2627
public static final String THREAD_CONTEXT_LABEL = "ThreadLabel";
2728
protected static final String NEW_LINE = System.lineSeparator();
29+
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2830
protected PropertyHelper propertyHelper = PropertyHelper.getInstance();
2931
protected Map<Featureset, Feature> featureMap;
30-
3132
protected RateLimiter rateLimiterOrigin;
3233
protected RateLimiter rateLimiterTarget;
3334
protected Integer maxRetries = 10;
3435
protected Integer printStatsAfter = 100000;
35-
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
3636

3737
protected BaseJobSession(SparkConf sc) {
3838
propertyHelper.initializeSparkConf(sc);
@@ -68,26 +68,26 @@ protected String getThreadLabel(BigInteger min, BigInteger max) {
6868
return formattedMin + ":" + formattedMax;
6969
}
7070

71-
private void appendToFile(Path path, String content)
72-
throws IOException {
73-
// if file not exists, create and write, else append
74-
Files.write(path, content.getBytes(StandardCharsets.UTF_8),
75-
StandardOpenOption.CREATE,
76-
StandardOpenOption.APPEND);
77-
}
71+
private void appendToFile(String filePath, String content) throws IOException {
72+
if (StringUtils.isAllBlank(filePath)) {
73+
filePath = "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
7874

79-
private void FileAppend(String dir, String fileName, String content) throws IOException {
80-
if (StringUtils.isAllBlank(dir)) {
81-
dir = "./"; // use current folder by default
8275
}
83-
Files.createDirectories(Paths.get(dir));
84-
Path path = Paths.get(dir + "/" + fileName + "_partitions.csv");
85-
appendToFile(path, content + NEW_LINE);
76+
Path path = Paths.get(filePath);
77+
if (StringUtils.isNotBlank(path.getParent().toString())) {
78+
Files.createDirectories(path.getParent());
79+
} else {
80+
path = Paths.get("./" + filePath);
81+
}
82+
83+
Files.write(path, (content + NEW_LINE).getBytes(StandardCharsets.UTF_8),
84+
StandardOpenOption.CREATE,
85+
StandardOpenOption.APPEND);
8686
}
8787

88-
protected void logFailedPartitionsInFile(String dir, String fileName, BigInteger min, BigInteger max) {
88+
protected void logFailedPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) {
8989
try {
90-
FileAppend(dir, fileName, min + "," + max);
90+
appendToFile(partitionFile, min + "," + max);
9191
} catch (Exception ee) {
9292
logger.error("Error occurred while writing to token range file min: {} max: {}", min, max, ee);
9393
}

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.datastax.cdm.data.PKFactory;
77
import com.datastax.cdm.data.Record;
88
import com.datastax.cdm.feature.Guardrail;
9-
import com.datastax.cdm.properties.KnownProperties;
109
import com.datastax.oss.driver.api.core.CqlSession;
1110
import com.datastax.oss.driver.api.core.cql.*;
1211
import org.apache.logging.log4j.ThreadContext;
@@ -26,14 +25,14 @@ public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition
2625
private final PKFactory pkFactory;
2726
private final boolean isCounterTable;
2827
private final Integer fetchSize;
28+
private final Integer batchSize;
2929
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
3030
protected AtomicLong readCounter = new AtomicLong(0);
3131
protected AtomicLong skippedCounter = new AtomicLong(0);
3232
protected AtomicLong writeCounter = new AtomicLong(0);
3333
protected AtomicLong errorCounter = new AtomicLong(0);
3434
private TargetUpsertStatement targetUpsertStatement;
3535
private TargetSelectByPKStatement targetSelectByPKStatement;
36-
private final Integer batchSize;
3736

3837
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
3938
super(originSession, targetSession, sc);
@@ -128,8 +127,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
128127
writeCounter.addAndGet(flushedWriteCnt);
129128
skippedCounter.addAndGet(skipCnt);
130129
errorCounter.addAndGet(readCnt - flushedWriteCnt - skipCnt);
131-
logFailedPartitionsInFile(tokenRangeExceptionDir,
132-
propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE), min, max);
130+
logFailedPartitionsInFile(partitionFile, min, max);
133131
}
134132
logger.error("Error occurred during Attempt#: {}", attempts, e);
135133
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",

src/main/java/com/datastax/cdm/job/CopyPKJobSession.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.datastax.cdm.job;
22

3-
import com.datastax.cdm.feature.Guardrail;
4-
import com.datastax.oss.driver.api.core.CqlSession;
5-
import com.datastax.oss.driver.api.core.cql.Row;
3+
import com.datastax.cdm.cql.statement.OriginSelectByPKStatement;
64
import com.datastax.cdm.data.EnhancedPK;
75
import com.datastax.cdm.data.PKFactory;
86
import com.datastax.cdm.data.Record;
9-
import com.datastax.cdm.cql.statement.OriginSelectByPKStatement;
7+
import com.datastax.cdm.feature.Guardrail;
8+
import com.datastax.oss.driver.api.core.CqlSession;
9+
import com.datastax.oss.driver.api.core.cql.Row;
1010
import org.apache.spark.SparkConf;
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
@@ -20,15 +20,14 @@
2020
public class CopyPKJobSession extends AbstractJobSession<SplitPartitions.PKRows> {
2121

2222
private static CopyPKJobSession copyJobSession;
23+
private final PKFactory pkFactory;
24+
private final List<Class> originPKClasses;
25+
private final boolean isCounterTable;
2326
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2427
protected AtomicLong readCounter = new AtomicLong(0);
2528
protected AtomicLong missingCounter = new AtomicLong(0);
2629
protected AtomicLong skipCounter = new AtomicLong(0);
2730
protected AtomicLong writeCounter = new AtomicLong(0);
28-
29-
private final PKFactory pkFactory;
30-
private final List<Class> originPKClasses;
31-
private final boolean isCounterTable;
3231
private OriginSelectByPKStatement originSelectByPKStatement;
3332

3433
protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
@@ -37,7 +36,7 @@ protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, S
3736
isCounterTable = this.originSession.getCqlTable().isCounterTable();
3837
originPKClasses = this.originSession.getCqlTable().getPKClasses();
3938

40-
logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPKStatement().getCQL());
39+
logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPKStatement().getCQL());
4140
}
4241

4342
@Override
@@ -47,7 +46,7 @@ public void processSlice(SplitPartitions.PKRows slice) {
4746

4847
public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
4948
originSelectByPKStatement = originSession.getOriginSelectByPKStatement();
50-
for (String row : rowsList.pkRows) {
49+
for (String row : rowsList.getPkRows()) {
5150
readCounter.incrementAndGet();
5251
EnhancedPK pk = toEnhancedPK(row);
5352
if (null == pk || pk.isError()) {
@@ -110,7 +109,7 @@ private EnhancedPK toEnhancedPK(String rowString) {
110109
String[] pkFields = rowString.split(" %% ");
111110
List<Object> values = new ArrayList<>(originPKClasses.size());
112111
if (logger.isDebugEnabled()) logger.debug("rowString={}, pkFields={}", rowString, pkFields);
113-
for (int i=0; i<pkFields.length; i++) {
112+
for (int i = 0; i < pkFields.length; i++) {
114113
PropertyEditor editor = PropertyEditorManager.findEditor(originPKClasses.get(i));
115114
editor.setAsText(pkFields[i]);
116115
values.add(editor.getValue());

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
155155
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
156156
Thread.currentThread().getId(), min, max, attempts);
157157
if (attempts == maxAttempts) {
158-
logFailedPartitionsInFile(tokenRangeExceptionDir,
159-
propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE), min, max);
158+
logFailedPartitionsInFile(partitionFile, min, max);
160159
}
161160
}
162161
}

src/main/java/com/datastax/cdm/job/SplitPartitions.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package com.datastax.cdm.job;
22

3+
import com.datastax.cdm.properties.KnownProperties;
4+
import com.datastax.cdm.properties.PropertyHelper;
5+
import org.apache.commons.lang3.StringUtils;
36
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
58

69
import java.io.*;
710
import java.math.BigInteger;
8-
import java.nio.file.Files;
9-
import java.nio.file.Paths;
1011
import java.util.ArrayList;
1112
import java.util.Collection;
1213
import java.util.Collections;
@@ -116,10 +117,30 @@ private static List<Partition> getSubPartitions(int numSplits, BigInteger min, B
116117
return partitions;
117118
}
118119

120+
private static BufferedReader getfileReader(String fileName) {
121+
try {
122+
return new BufferedReader(new FileReader(fileName));
123+
} catch (FileNotFoundException fnfe) {
124+
throw new RuntimeException("No '" + fileName + "' file found!! Add this file in the current folder & rerun!");
125+
}
126+
}
127+
128+
public static String getPartitionFile(PropertyHelper propertyHelper) {
129+
String filePath = propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE);
130+
if (StringUtils.isAllBlank(filePath)) {
131+
filePath = "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
132+
}
133+
134+
return filePath;
135+
}
136+
119137
public static class PKRows implements Serializable {
120138
private static final long serialVersionUID = 1L;
139+
private List<String> pkRows;
121140

122-
List<String> pkRows;
141+
public List<String> getPkRows() {
142+
return pkRows;
143+
}
123144

124145
public PKRows(List<String> rows) {
125146
pkRows = new ArrayList<>(rows);
@@ -129,8 +150,8 @@ public PKRows(List<String> rows) {
129150
public static class Partition implements Serializable {
130151
private static final long serialVersionUID = 1L;
131152

132-
private BigInteger min;
133-
private BigInteger max;
153+
private final BigInteger min;
154+
private final BigInteger max;
134155

135156
public Partition(BigInteger min, BigInteger max) {
136157
this.min = min;
@@ -149,13 +170,4 @@ public String toString() {
149170
return "Processing partition for token range " + min + " to " + max;
150171
}
151172
}
152-
153-
private static BufferedReader getfileReader(String fileName) {
154-
try {
155-
return new BufferedReader(new FileReader(fileName));
156-
} catch (FileNotFoundException fnfe) {
157-
throw new RuntimeException("No '" + fileName + "' file found!! Add this file in the current folder & rerun!");
158-
}
159-
}
160-
161173
}

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ public enum PropertyType {
2626
public static final String CONNECT_ORIGIN_USERNAME = "spark.cdm.connect.origin.username";
2727
public static final String CONNECT_ORIGIN_PASSWORD = "spark.cdm.connect.origin.password";
2828

29-
3029
public static final String CONNECT_TARGET_HOST = "spark.cdm.connect.target.host";
3130
public static final String CONNECT_TARGET_PORT = "spark.cdm.connect.target.port";
3231
public static final String CONNECT_TARGET_SCB = "spark.cdm.connect.target.scb";
@@ -53,7 +52,6 @@ public enum PropertyType {
5352
defaults.put(CONNECT_TARGET_USERNAME, "cassandra");
5453
types.put(CONNECT_TARGET_PASSWORD, PropertyType.STRING);
5554
defaults.put(CONNECT_TARGET_PASSWORD, "cassandra");
56-
5755
}
5856

5957
//==========================================================================
@@ -76,7 +74,6 @@ public enum PropertyType {
7674
types.put(ORIGIN_WRITETIME_NAMES, PropertyType.STRING_LIST);
7775
types.put(ORIGIN_WRITETIME_AUTO, PropertyType.BOOLEAN);
7876
defaults.put(ORIGIN_WRITETIME_AUTO, "true");
79-
8077
types.put(ORIGIN_COLUMN_NAMES_TO_TARGET, PropertyType.STRING_LIST);
8178
}
8279

@@ -139,9 +136,9 @@ public enum PropertyType {
139136
//==========================================================================
140137
// Error handling
141138
//==========================================================================
142-
public static final String TOKEN_RANGE_EXCEPTION_DIR = "spark.tokenRange.exceptionDir";
139+
public static final String TOKEN_RANGE_PARTITION_FILE = "spark.tokenrange.partitionFile";
143140
static {
144-
types.put(TOKEN_RANGE_EXCEPTION_DIR, PropertyType.STRING);
141+
types.put(TOKEN_RANGE_PARTITION_FILE, PropertyType.STRING);
145142
}
146143
//==========================================================================
147144
// Guardrails and Transformations

0 commit comments

Comments
 (0)