Skip to content

Commit 4003cfb

Browse files
committed
Moved filtering properties to not be required in original config
1 parent d848572 commit 4003cfb

File tree

4 files changed

+30
-15
lines changed

4 files changed

+30
-15
lines changed

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ public class CopyJobSession extends AbstractJobSession {
2323
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2424
super(sourceSession, astraSession, sc);
2525
filterData = Boolean.parseBoolean(sc.get("spark.origin.FilterData", "false"));
26-
filterColName = sc.get("spark.origin.FilterColumn");
27-
filterColType = sc.get("spark.origin.FilterColumnType");
26+
filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn");
27+
filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType");
2828
filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
29-
filterColValue = sc.get("spark.origin.FilterColumnValue");
29+
filterColValue = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnValue");
3030
}
3131

3232
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
@@ -60,7 +60,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
6060
if (filterData) {
6161
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
6262
if (col.trim().equalsIgnoreCase(filterColValue)) {
63-
logger.warn("Row larger than 10 MB found filtering out: " + getKey(sourceRow));
63+
logger.warn("Skipping row and filtering out: " + getKey(sourceRow));
64+
skippedCounter.incrementAndGet();
6465
continue;
6566
}
6667
}
@@ -108,7 +109,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
108109
if (filterData) {
109110
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
110111
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
111-
logger.warn("Row larger than 10 MB found filtering out: " + getKey(sourceRow));
112+
logger.warn("Skipping row and filtering out: " + getKey(sourceRow));
113+
skippedCounter.incrementAndGet();
112114
continue;
113115
}
114116
}

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
4343
checkTableforColSize = Boolean.parseBoolean(sparkConf.get("spark.origin.checkTableforColSize", "false"));
4444
checkTableforselectCols = sparkConf.get("spark.origin.checkTableforColSize.cols");
4545
checkTableforColSizeTypes = getTypes(sparkConf.get("spark.origin.checkTableforColSize.cols.types"));
46-
filterColName = sparkConf.get("spark.origin.FilterColumn");
47-
filterColType = sparkConf.get("spark.origin.FilterColumnType");
46+
filterColName = Util.getSparkPropOrEmpty(sparkConf, "spark.origin.FilterColumn");
47+
filterColType = Util.getSparkPropOrEmpty(sparkConf, "spark.origin.FilterColumnType");
4848
filterColIndex = Integer.parseInt(sparkConf.get("spark.origin.FilterColumnIndex", "0"));
4949
fieldGuardraillimitMB = Integer.parseInt(sparkConf.get("spark.fieldGuardraillimitMB", "0"));
5050

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ class BaseJob extends App {
4949
val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")
5050
val splitSize = Integer.parseInt(Util.getSparkPropOr(sc, "spark.splitSize", "10000"))
5151

52+
val checkTableforColSize = Util.getSparkPropOr(sc, "spark.origin.checkTableforColSize", "false")
53+
val checkTableforselectCols = Util.getSparkPropOrEmpty(sc, "spark.origin.checkTableforColSize.cols")
54+
// val checkTableforColSizeTypes = getTypes(Util.getSparkPropOr(sc, "spark.origin.checkTableforColSize.cols.types"))
55+
val filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn")
56+
val filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType")
57+
val filterColIndex = Util.getSparkPropOr(sc, "spark.origin.FilterColumnIndex", "0")
58+
val fieldGuardraillimitMB = Util.getSparkPropOr(sc, "spark.fieldGuardraillimitMB", "0")
59+
60+
5261
protected def exitSpark() = {
5362
spark.stop()
5463
abstractLogger.info("################################################################################################")

src/resources/sparkConf.properties

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@ spark.origin.username some-username
44
spark.origin.password some-secret-password
55
spark.origin.read.consistency.level LOCAL_QUORUM
66
spark.origin.keyspaceTable test.a1
7-
spark.origin.checkTableforColSize false
8-
spark.origin.checkTableforColSize.cols partition-key,clustering-key
9-
spark.origin.checkTableforColSize.cols.types 9,1
10-
spark.origin.FilterData true
11-
spark.origin.FilterColumn test
12-
spark.origin.FilterColumnIndex 2
13-
spark.origin.FilterColumnType 6%16
14-
spark.origin.FilterColumnValue test
157

168
spark.target.isAstra true
179
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip
@@ -48,6 +40,18 @@ spark.origin.writeTimeStampFilter false
4840
spark.origin.minWriteTimeStampFilter 0
4941
spark.origin.maxWriteTimeStampFilter 9223372036854775807
5042

43+
################### ONLY USE if needing to get record count of recs greater than 10MB from Origin ######################
44+
#spark.origin.checkTableforColSize false
45+
#spark.origin.checkTableforColSize.cols partition-key,clustering-key
46+
#spark.origin.checkTableforColSize.cols.types 9,1
47+
48+
########################## ONLY USE if needing to filter data from Origin ###############################
49+
#spark.origin.FilterData false
50+
#spark.origin.FilterColumn test
51+
#spark.origin.FilterColumnIndex 2
52+
#spark.origin.FilterColumnType 6%16
53+
#spark.origin.FilterColumnValue test
54+
5155
########################## ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE ###############################
5256
#spark.origin.trustStore.path
5357
#spark.origin.trustStore.password

0 commit comments

Comments
 (0)