Skip to content

Commit 3b2f683

Browse files
committed
CDM-87 Implemented partition-range exception file generation for Migration job & refactored the same for validation.
1 parent 06cc507 commit 3b2f683

File tree

12 files changed

+129
-165
lines changed

12 files changed

+129
-165
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
1212
- All migration tools (`cassandra-data-migrator` + `dsbulk` + `cqlsh`) would be available in the `/assets/` folder of the container
1313

1414
## Install as a JAR file
15-
- Download the latest jar file from the GitHub [packages area here](https://github.com/orgs/datastax/packages?repo_name=cassandra-data-migrator)
15+
- Download the latest jar file from the GitHub [packages area here](https://github.com/datastax/cassandra-data-migrator/packages/1832128)
1616

1717
### Prerequisite
1818
- Install Java8 as spark binaries are compiled with it.
@@ -101,27 +101,27 @@ When running in above mode the tool assumes a `partitions.csv` file to be presen
101101
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.
102102

103103
> **Note:**
104-
> Here is a quick tip to prepare `partitions.csv` from the log file,
104+
> A file ending with `*_partitions.csv` will be auto created by the Migration & Validation job in the above format containing any failed partition ranges. Just rename it as below & run the above job.
105105
106106
```
107-
grep "ERROR CopyJobSession: Error with PartitionRange" /path/to/logfile_name.txt | awk '{print $13","$15}' > partitions.csv
107+
mv <keyspace>.<table>_partitions.csv partitions.csv
108108
```
109109
# Data validation for specific partition ranges
110110
- You can also use the tool to validate data for a specific partition ranges using class option `--class com.datastax.cdm.job.DiffPartitionsFromFile` as shown below,
111111
```
112112
./spark-submit --properties-file cdm.properties /
113-
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
113+
--conf spark.cdm.schema.origin.keyspaceTable="<keyspace-name>.<table-name>" /
114114
--master "local[*]" /
115115
--class com.datastax.cdm.job.DiffPartitionsFromFile cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
116116
```
117117

118118
When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder.
119119

120120
# Perform large-field Guardrail violation checks
121-
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class datastax.astra.migrate.Guardrail` as shown below
121+
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below
122122
```
123123
./spark-submit --properties-file cdm.properties /
124-
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
124+
--conf spark.cdm.schema.origin.keyspaceTable="<keyspace-name>.<table-name>" /
125125
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 /
126126
--master "local[*]" /
127127
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt

RELEASE.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# Release Notes
2+
## [4.0.2] - 2023-06-16
3+
- Capture failed partitions in a file for easier reruns
4+
- Optimized mvn to reduce jar size
5+
- Fixed bugs in docs
26

37
## [4.0.1] - 2023-06-08
48
- Fixes broken maven link in docker build process

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,28 @@
11
package com.datastax.cdm.job;
22

33
import com.datastax.cdm.cql.EnhancedSession;
4+
import com.datastax.cdm.data.PKFactory;
5+
import com.datastax.cdm.feature.Feature;
46
import com.datastax.cdm.feature.Featureset;
57
import com.datastax.cdm.feature.Guardrail;
8+
import com.datastax.cdm.properties.KnownProperties;
69
import com.datastax.oss.driver.api.core.CqlSession;
710
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8-
import com.datastax.cdm.data.PKFactory;
9-
import com.datastax.cdm.feature.Feature;
10-
import com.datastax.cdm.properties.KnownProperties;
1111
import org.apache.spark.SparkConf;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

1515
public abstract class AbstractJobSession<T> extends BaseJobSession {
1616

17-
public abstract void processSlice(T slice);
18-
public abstract void printCounts(boolean isFinal);
19-
2017
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
21-
2218
protected EnhancedSession originSession;
2319
protected EnhancedSession targetSession;
2420
protected Guardrail guardrailFeature;
2521
protected boolean guardrailEnabled;
26-
22+
protected String tokenRangeExceptionDir;
2723
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
2824
this(originSession, targetSession, sc, false);
2925
}
30-
3126
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
3227
super(sc);
3328

@@ -37,7 +32,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
3732

3833
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
3934
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1)) {
40-
logger.warn(KnownProperties.PRINT_STATS_AFTER +" must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
35+
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
4136
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
4237
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
4338
}
@@ -47,11 +42,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4742
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
4843

4944
tokenRangeExceptionDir = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_DIR);
50-
exceptionFileName = propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);
5145

5246
logger.info("PARAM -- Max Retries: {}", maxRetries);
5347
logger.info("PARAM -- Token range exception dir: {}", tokenRangeExceptionDir);
54-
logger.info("PARAM -- Token range exception file name: {}", exceptionFileName);
5548
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
5649
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
5750

@@ -80,6 +73,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
8073
// Guardrail is referenced by many jobs, and is evaluated against the target table
8174
this.guardrailFeature = (Guardrail) this.targetSession.getCqlTable().getFeature(Featureset.GUARDRAIL_CHECK);
8275
this.guardrailEnabled = this.guardrailFeature.isEnabled();
83-
8476
}
77+
78+
public abstract void processSlice(T slice);
79+
80+
public abstract void printCounts(boolean isFinal);
8581
}

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,38 @@
11
package com.datastax.cdm.job;
22

3-
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
43
import com.datastax.cdm.feature.Feature;
54
import com.datastax.cdm.feature.FeatureFactory;
65
import com.datastax.cdm.feature.Featureset;
76
import com.datastax.cdm.properties.PropertyHelper;
7+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
8+
import org.apache.commons.lang3.StringUtils;
89
import org.apache.logging.log4j.ThreadContext;
910
import org.apache.spark.SparkConf;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1013

14+
import java.io.IOException;
1115
import java.math.BigInteger;
16+
import java.nio.charset.StandardCharsets;
17+
import java.nio.file.Files;
18+
import java.nio.file.Path;
19+
import java.nio.file.Paths;
20+
import java.nio.file.StandardOpenOption;
1221
import java.util.HashMap;
1322
import java.util.Map;
1423

1524
public abstract class BaseJobSession {
1625

1726
public static final String THREAD_CONTEXT_LABEL = "ThreadLabel";
27+
protected static final String NEW_LINE = System.lineSeparator();
1828
protected PropertyHelper propertyHelper = PropertyHelper.getInstance();
1929
protected Map<Featureset, Feature> featureMap;
2030

21-
// Read/Write Rate limiter
22-
// Determine the total throughput for the entire cluster in terms of wries/sec,
23-
// reads/sec
24-
// then do the following to set the values as they are only applicable per JVM
25-
// (hence spark Executor)...
26-
// Rate = Total Throughput (write/read per sec) / Total Executors
2731
protected RateLimiter rateLimiterOrigin;
2832
protected RateLimiter rateLimiterTarget;
2933
protected Integer maxRetries = 10;
30-
3134
protected Integer printStatsAfter = 100000;
32-
33-
protected String tokenRangeExceptionDir;
34-
protected String exceptionFileName;
35+
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
3536

3637
protected BaseJobSession(SparkConf sc) {
3738
propertyHelper.initializeSparkConf(sc);
@@ -67,4 +68,29 @@ protected String getThreadLabel(BigInteger min, BigInteger max) {
6768
return formattedMin + ":" + formattedMax;
6869
}
6970

71+
private void appendToFile(Path path, String content)
72+
throws IOException {
73+
// if file not exists, create and write, else append
74+
Files.write(path, content.getBytes(StandardCharsets.UTF_8),
75+
StandardOpenOption.CREATE,
76+
StandardOpenOption.APPEND);
77+
}
78+
79+
private void FileAppend(String dir, String fileName, String content) throws IOException {
80+
if (StringUtils.isAllBlank(dir)) {
81+
dir = "./"; // use current folder by default
82+
}
83+
Files.createDirectories(Paths.get(dir));
84+
Path path = Paths.get(dir + "/" + fileName + "_partitions.csv");
85+
appendToFile(path, content + NEW_LINE);
86+
}
87+
88+
protected void logFailedPartitionsInFile(String dir, String fileName, BigInteger min, BigInteger max) {
89+
try {
90+
FileAppend(dir, fileName, min + "," + max);
91+
} catch (Exception ee) {
92+
logger.error("Error occurred while writing to token range file min: {} max: {}", min, max, ee);
93+
}
94+
}
95+
7096
}

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,37 @@
33
import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement;
44
import com.datastax.cdm.cql.statement.TargetSelectByPKStatement;
55
import com.datastax.cdm.cql.statement.TargetUpsertStatement;
6+
import com.datastax.cdm.data.PKFactory;
7+
import com.datastax.cdm.data.Record;
68
import com.datastax.cdm.feature.Guardrail;
9+
import com.datastax.cdm.properties.KnownProperties;
710
import com.datastax.oss.driver.api.core.CqlSession;
811
import com.datastax.oss.driver.api.core.cql.*;
9-
import com.datastax.cdm.data.PKFactory;
10-
import com.datastax.cdm.data.Record;
1112
import org.apache.logging.log4j.ThreadContext;
1213
import org.apache.spark.SparkConf;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516

1617
import java.math.BigInteger;
17-
import java.util.*;
18+
import java.util.ArrayList;
19+
import java.util.Collection;
1820
import java.util.concurrent.CompletionStage;
1921
import java.util.concurrent.atomic.AtomicLong;
2022

2123
public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition> {
2224

2325
private static CopyJobSession copyJobSession;
26+
private final PKFactory pkFactory;
27+
private final boolean isCounterTable;
28+
private final Integer fetchSize;
2429
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2530
protected AtomicLong readCounter = new AtomicLong(0);
2631
protected AtomicLong skippedCounter = new AtomicLong(0);
2732
protected AtomicLong writeCounter = new AtomicLong(0);
2833
protected AtomicLong errorCounter = new AtomicLong(0);
29-
3034
private TargetUpsertStatement targetUpsertStatement;
3135
private TargetSelectByPKStatement targetSelectByPKStatement;
32-
private final PKFactory pkFactory;
33-
private final boolean isCounterTable;
34-
private Integer batchSize;
35-
private final Integer fetchSize;
36+
private final Integer batchSize;
3637

3738
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
3839
super(originSession, targetSession, sc);
@@ -42,9 +43,9 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Spa
4243
fetchSize = this.originSession.getCqlTable().getFetchSizeInRows();
4344
batchSize = this.originSession.getCqlTable().getBatchSize();
4445

45-
logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
46-
logger.info("CQL -- target select: {}",this.targetSession.getTargetSelectByPKStatement().getCQL());
47-
logger.info("CQL -- target upsert: {}",this.targetSession.getTargetUpsertStatement().getCQL());
46+
logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
47+
logger.info("CQL -- target select: {}", this.targetSession.getTargetSelectByPKStatement().getCQL());
48+
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
4849
}
4950

5051
@Override
@@ -53,7 +54,7 @@ public void processSlice(SplitPartitions.Partition slice) {
5354
}
5455

5556
public void getDataAndInsert(BigInteger min, BigInteger max) {
56-
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min,max));
57+
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
5758
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
5859
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
5960
boolean done = false;
@@ -127,6 +128,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
127128
writeCounter.addAndGet(flushedWriteCnt);
128129
skippedCounter.addAndGet(skipCnt);
129130
errorCounter.addAndGet(readCnt - flushedWriteCnt - skipCnt);
131+
logFailedPartitionsInFile(tokenRangeExceptionDir,
132+
propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE), min, max);
130133
}
131134
logger.error("Error occurred during Attempt#: {}", attempts, e);
132135
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
@@ -182,8 +185,7 @@ private BatchStatement writeAsync(BatchStatement batch, Collection<CompletionSta
182185
return BatchStatement.newInstance(BatchType.UNLOGGED);
183186
}
184187
return batch;
185-
}
186-
else {
188+
} else {
187189
writeResults.add(targetUpsertStatement.executeAsync(boundUpsert));
188190
return batch;
189191
}

0 commit comments

Comments
 (0)