Skip to content

Commit 6a2a5c3

Browse files
authored
Merge pull request #105 from datastax/feature/field-size-check
Refactored CDM field-size guardrail check reporting
2 parents 2f6d160 + b1077b4 commit 6a2a5c3

17 files changed

+256
-317
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ ENV MAVEN_HOME /usr/share/maven
2525
ENV MAVEN_CONFIG "$USER_HOME_DIR/.m2"
2626
COPY ./src /assets/src
2727
COPY ./pom.xml /assets/pom.xml
28-
COPY ./src/resources/sparkConf.properties /assets/
28+
COPY src/resources/cdm.properties /assets/
2929
COPY ./src/resources/partitions.csv /assets/
3030
COPY ./src/resources/primary_key_rows.csv /assets/
3131
COPY ./src/resources/runCommands.txt /assets/

README.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ tar -xvzf spark-3.3.1-bin-hadoop3.tgz
2424

2525
# Steps for Data-Migration:
2626

27-
1. `sparkConf.properties` file needs to be configured as applicable for the environment
28-
> A sample Spark conf file configuration can be [found here](./src/resources/sparkConf.properties)
27+
1. `cdm.properties` file needs to be configured as applicable for the environment
28+
> A sample properties file can be [found here](./src/resources/cdm.properties)
2929
2. Place the conf file where it can be accessed while running the job via spark-submit.
3030
3. Run the below job using `spark-submit` command as shown below:
3131

3232
```
33-
./spark-submit --properties-file sparkConf.properties /
33+
./spark-submit --properties-file cdm.properties /
3434
--master "local[*]" /
3535
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
3636
```
@@ -39,7 +39,7 @@ Note:
3939
- Above command generates a log file `logfile_name.txt` to avoid log output on the console.
4040
- Add option `--driver-memory 25G --executor-memory 25G` as shown below if the table migrated is large (over 100GB)
4141
```
42-
./spark-submit --properties-file sparkConf.properties /
42+
./spark-submit --properties-file cdm.properties /
4343
--master "local[*]" --driver-memory 25G --executor-memory 25G /
4444
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
4545
```
@@ -49,7 +49,7 @@ Note:
4949
- To run the job in Data validation mode, use class option `--class datastax.astra.migrate.DiffData` as shown below
5050

5151
```
52-
./spark-submit --properties-file sparkConf.properties /
52+
./spark-submit --properties-file cdm.properties /
5353
--master "local[*]" /
5454
--class datastax.astra.migrate.DiffData cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
5555
```
@@ -79,7 +79,7 @@ Note:
7979
# Migrating specific partition ranges
8080
- You can also use the tool to migrate specific partition ranges using class option `--class datastax.astra.migrate.MigratePartitionsFromFile` as shown below
8181
```
82-
./spark-submit --properties-file sparkConf.properties /
82+
./spark-submit --properties-file cdm.properties /
8383
--master "local[*]" /
8484
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
8585
```
@@ -93,11 +93,21 @@ When running in above mode the tool assumes a `partitions.csv` file to be presen
9393
```
9494
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.
9595

96+
# Perform large-field Guardrail violation checks
97+
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class datastax.astra.migrate.Guardrail` as shown below
98+
```
99+
./spark-submit --properties-file cdmGuardrail.properties /
100+
--master "local[*]" /
101+
--class datastax.astra.migrate.Guardrail cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
102+
```
103+
> A sample Guardrail properties file can be [found here](./src/resources/cdmGuardrail.properties)
104+
96105
# Features
97106
- Supports migration/validation of [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
98107
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
99108
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
100109
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or min/max token-range
110+
- Perform guardrail checks (identify large fields)
101111
- Supports adding `constants` as new columns on `Target`
102112
- Fully containerized (Docker and K8s friendly)
103113
- SSL Support (including custom cipher algorithms)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
<properties>
1010
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11-
<revision>3.2.3</revision>
11+
<revision>3.3.1</revision>
1212
<scala.version>2.12.17</scala.version>
1313
<scala.main.version>2.12</scala.main.version>
1414
<spark.version>3.3.1</spark.version>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
public class AbstractJobSession extends BaseJobSession {
2020

2121
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
22+
protected CqlSession sourceSession;
23+
protected CqlSession astraSession;
2224

2325
protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2426
this(sourceSession, astraSession, sc, false);
@@ -34,18 +36,11 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
3436
this.sourceSession = sourceSession;
3537
this.astraSession = astraSession;
3638

37-
batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "5"));
38-
fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
39-
printStatsAfter = new Integer(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
40-
if (printStatsAfter < 1) {
41-
printStatsAfter = 100000;
42-
}
39+
batchSize = Integer.parseInt(Util.getSparkPropOr(sc, "spark.batchSize", "5"));
40+
fetchSizeInRows = Integer.parseInt(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
4341

44-
readLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
45-
writeLimiter = RateLimiter.create(new Integer(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
42+
writeLimiter = RateLimiter.create(Integer.parseInt(Util.getSparkPropOr(sc, "spark.writeRateLimit", "40000")));
4643
maxRetries = Integer.parseInt(sc.get("spark.maxRetries", "0"));
47-
48-
sourceKeyspaceTable = Util.getSparkProp(sc, "spark.origin.keyspaceTable");
4944
astraKeyspaceTable = Util.getSparkProp(sc, "spark.target.keyspaceTable");
5045

5146
String ttlColsStr = Util.getSparkPropOrEmpty(sc, "spark.query.ttl.cols");
@@ -105,24 +100,13 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
105100
Instant.ofEpochMilli(maxWriteTimeStampFilter / 1000));
106101
}
107102

108-
String selectCols = Util.getSparkProp(sc, "spark.query.origin");
109-
String partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
110-
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
111-
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
112-
sourceSelectCondition = " AND " + sourceSelectCondition;
113-
}
114-
115103
final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
116-
allCols = selectCols.split(",");
117104
ttlCols.forEach(col -> {
118105
selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
119106
});
120107
writeTimeStampCols.forEach(col -> {
121108
selectTTLWriteTimeCols.append(",writetime(" + allCols[col] + ")");
122109
});
123-
selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
124-
String idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
125-
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
126110

127111
String insertCols = Util.getSparkPropOrEmpty(sc, "spark.query.target");
128112
if (null == insertCols || insertCols.trim().isEmpty()) {
@@ -152,7 +136,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
152136
"select " + insertCols + " from " + astraKeyspaceTable
153137
+ " where " + insertBinds);
154138

155-
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
156139
isCounterTable = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.counterTable", "false"));
157140
if (isCounterTable) {
158141
String updateSelectMappingStr = Util.getSparkPropOr(sc, "spark.counterTable.cql.index", "0");

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
package datastax.astra.migrate;
22

33
import com.datastax.oss.driver.api.core.ConsistencyLevel;
4-
import com.datastax.oss.driver.api.core.CqlSession;
54
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
65
import com.datastax.oss.driver.api.core.cql.Row;
76
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
7+
import org.apache.commons.lang.SerializationUtils;
88
import org.apache.spark.SparkConf;
99

10+
import java.io.Serializable;
1011
import java.util.ArrayList;
1112
import java.util.List;
1213
import java.util.Map;
1314
import java.util.Set;
15+
import java.util.concurrent.atomic.AtomicLong;
1416

1517
public abstract class BaseJobSession {
1618

@@ -29,9 +31,8 @@ public abstract class BaseJobSession {
2931
protected RateLimiter readLimiter;
3032
protected RateLimiter writeLimiter;
3133
protected Integer maxRetries = 10;
34+
protected AtomicLong readCounter = new AtomicLong(0);
3235

33-
protected CqlSession sourceSession;
34-
protected CqlSession astraSession;
3536
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
3637
protected List<MigrateDataType> idColTypes = new ArrayList<MigrateDataType>();
3738
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
@@ -47,7 +48,7 @@ public abstract class BaseJobSession {
4748

4849
protected List<Integer> writeTimeStampCols = new ArrayList<Integer>();
4950
protected List<Integer> ttlCols = new ArrayList<Integer>();
50-
protected Boolean isCounterTable;
51+
protected Boolean isCounterTable = false;
5152

5253
protected String sourceKeyspaceTable;
5354
protected String astraKeyspaceTable;
@@ -59,13 +60,35 @@ public abstract class BaseJobSession {
5960
protected Integer filterColIndex;
6061
protected String filterColValue;
6162

63+
protected String selectCols;
64+
protected String partitionKey;
65+
protected String sourceSelectCondition;
6266
protected String[] allCols;
67+
protected String idCols;
6368
protected String tsReplaceValStr;
6469
protected long tsReplaceVal;
6570

6671
protected BaseJobSession(SparkConf sc) {
6772
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
6873
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
74+
readLimiter = RateLimiter.create(Integer.parseInt(Util.getSparkPropOr(sc, "spark.readRateLimit", "20000")));
75+
sourceKeyspaceTable = sc.get("spark.origin.keyspaceTable");
76+
hasRandomPartitioner = Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.origin.hasRandomPartitioner", "false"));
77+
78+
selectCols = Util.getSparkProp(sc, "spark.query.origin");
79+
allCols = selectCols.split(",");
80+
partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
81+
sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
82+
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
83+
sourceSelectCondition = " AND " + sourceSelectCondition;
84+
}
85+
selectColTypes = getTypes(Util.getSparkProp(sc, "spark.query.types"));
86+
idCols = Util.getSparkPropOrEmpty(sc, "spark.query.target.id");
87+
idColTypes = selectColTypes.subList(0, idCols.split(",").length);
88+
printStatsAfter = Integer.parseInt(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
89+
if (printStatsAfter < 1) {
90+
printStatsAfter = 100000;
91+
}
6992
}
7093

7194
public String getKey(Row sourceRow) {
@@ -91,20 +114,24 @@ public List<MigrateDataType> getTypes(String types) {
91114
return dataTypes;
92115
}
93116

94-
public Object getData(MigrateDataType dataType, int index, Row sourceRow) {
117+
public Object getData(MigrateDataType dataType, int index, Row row) {
95118
if (dataType.typeClass == Map.class) {
96-
return sourceRow.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
119+
return row.getMap(index, dataType.subTypes.get(0), dataType.subTypes.get(1));
97120
} else if (dataType.typeClass == List.class) {
98-
return sourceRow.getList(index, dataType.subTypes.get(0));
121+
return row.getList(index, dataType.subTypes.get(0));
99122
} else if (dataType.typeClass == Set.class) {
100-
return sourceRow.getSet(index, dataType.subTypes.get(0));
123+
return row.getSet(index, dataType.subTypes.get(0));
101124
} else if (isCounterTable && dataType.typeClass == Long.class) {
102-
Object data = sourceRow.get(index, dataType.typeClass);
125+
Object data = row.get(index, dataType.typeClass);
103126
if (data == null) {
104-
return new Long(0);
127+
return Long.valueOf(0);
105128
}
106129
}
107130

108-
return sourceRow.get(index, dataType.typeClass);
131+
return row.get(index, dataType.typeClass);
132+
}
133+
134+
public int getFieldSize(MigrateDataType dataType, int index, Row row) {
135+
return SerializationUtils.serialize((Serializable) getData(dataType, index, row)).length;
109136
}
110137
}

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ public class CopyJobSession extends AbstractJobSession {
1616

1717
private static CopyJobSession copyJobSession;
1818
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
19-
protected AtomicLong readCounter = new AtomicLong(0);
2019
protected AtomicLong skippedCounter = new AtomicLong(0);
2120
protected AtomicLong writeCounter = new AtomicLong(0);
2221
protected AtomicLong errorCounter = new AtomicLong(0);
@@ -50,7 +49,6 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
5049
long readCnt = 0;
5150
long writeCnt = 0;
5251
long skipCnt = 0;
53-
long errCnt = 0;
5452
try {
5553
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ?
5654
min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
@@ -155,7 +153,6 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
155153
CompletionStage<AsyncResultSet> writeResultSet = astraSession.executeAsync(batchStatement);
156154
writeResults.add(writeResultSet);
157155
writeCnt += iterateAndClearWriteResults(writeResults, batchStatement.size());
158-
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
159156
}
160157
}
161158

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,15 @@
2222
public class DiffJobSession extends CopyJobSession {
2323

2424
private static DiffJobSession diffJobSession;
25+
private final AtomicLong mismatchCounter = new AtomicLong(0);
26+
private final AtomicLong missingCounter = new AtomicLong(0);
27+
private final AtomicLong correctedMissingCounter = new AtomicLong(0);
28+
private final AtomicLong correctedMismatchCounter = new AtomicLong(0);
29+
private final AtomicLong validCounter = new AtomicLong(0);
30+
private final AtomicLong skippedCounter = new AtomicLong(0);
2531
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
2632
protected Boolean autoCorrectMissing = false;
2733
protected Boolean autoCorrectMismatch = false;
28-
private AtomicLong readCounter = new AtomicLong(0);
29-
private AtomicLong mismatchCounter = new AtomicLong(0);
30-
private AtomicLong missingCounter = new AtomicLong(0);
31-
private AtomicLong correctedMissingCounter = new AtomicLong(0);
32-
private AtomicLong correctedMismatchCounter = new AtomicLong(0);
33-
private AtomicLong validCounter = new AtomicLong(0);
34-
private AtomicLong skippedCounter = new AtomicLong(0);
3534

3635
private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
3736
super(sourceSession, astraSession, sc);

0 commit comments

Comments
 (0)