Skip to content

Commit 869b8fc

Browse files
authored
Merge pull request #36 from datastax/feature/filter-exclustion-copyjob
Added filtering code to CopyJob.
2 parents 7e8f61b + 6f551d9 commit 869b8fc

File tree

7 files changed

+57
-27
lines changed

7 files changed

+57
-27
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.6</version>
6+
<version>2.7</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,25 @@ public abstract class BaseJobSession {
4848
protected String astraKeyspaceTable;
4949

5050
protected Boolean hasRandomPartitioner;
51+
protected Boolean filterData;
52+
protected String filterColName;
53+
protected String filterColType;
54+
protected Integer filterColIndex;
55+
protected String filterColValue;
5156

57+
public String getKey(Row sourceRow) {
58+
StringBuffer key = new StringBuffer();
59+
for (int index = 0; index < idColTypes.size(); index++) {
60+
MigrateDataType dataType = idColTypes.get(index);
61+
if (index == 0) {
62+
key.append(getData(dataType, index, sourceRow));
63+
} else {
64+
key.append(" %% " + getData(dataType, index, sourceRow));
65+
}
66+
}
67+
68+
return key.toString();
69+
}
5270
public List<MigrateDataType> getTypes(String types) {
5371
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
5472
for (String type : types.split(",")) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ public class CopyJobSession extends AbstractJobSession {
2222

2323
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2424
super(sourceSession, astraSession, sc);
25+
filterData = Boolean.parseBoolean(sc.get("spark.origin.FilterData", "false"));
26+
filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn");
27+
filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType");
28+
filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
29+
filterColValue = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnValue");
2530
}
2631

2732
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
@@ -52,6 +57,14 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
5257
for (Row sourceRow : resultSet) {
5358
readLimiter.acquire(1);
5459

60+
if (filterData) {
61+
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
62+
if (col.trim().equalsIgnoreCase(filterColValue)) {
63+
logger.warn("Skipping row and filtering out: " + getKey(sourceRow));
64+
skippedCounter.incrementAndGet();
65+
continue;
66+
}
67+
}
5568
if (writeTimeStampFilter) {
5669
// only process rows greater than writeTimeStampFilter
5770
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
@@ -92,6 +105,16 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
92105
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
93106
printCounts(false);
94107
}
108+
109+
if (filterData) {
110+
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
111+
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
112+
logger.warn("Skipping row and filtering out: " + getKey(sourceRow));
113+
skippedCounter.incrementAndGet();
114+
continue;
115+
}
116+
}
117+
95118
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
96119

97120
// if batch threshold is met, send the writes and clear the batch

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -186,18 +186,4 @@ private String isDifferent(Row sourceRow, Row astraRow) {
186186
return diffData.toString();
187187
}
188188

189-
private String getKey(Row sourceRow) {
190-
StringBuffer key = new StringBuffer();
191-
for (int index = 0; index < idColTypes.size(); index++) {
192-
MigrateDataType dataType = idColTypes.get(index);
193-
if (index == 0) {
194-
key.append(getData(dataType, index, sourceRow));
195-
} else {
196-
key.append(" %% " + getData(dataType, index, sourceRow));
197-
}
198-
}
199-
200-
return key.toString();
201-
}
202-
203189
}

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ public class OriginCountJobSession extends BaseJobSession {
2323
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
2424
protected Boolean checkTableforColSize;
2525
protected String checkTableforselectCols;
26-
protected String filterColName;
27-
protected String filterColType;
28-
protected Integer filterColIndex;
2926
protected Integer fieldGuardraillimitMB;
3027
protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
3128

@@ -46,8 +43,8 @@ protected OriginCountJobSession(CqlSession sourceSession, SparkConf sparkConf) {
4643
checkTableforColSize = Boolean.parseBoolean(sparkConf.get("spark.origin.checkTableforColSize", "false"));
4744
checkTableforselectCols = sparkConf.get("spark.origin.checkTableforColSize.cols");
4845
checkTableforColSizeTypes = getTypes(sparkConf.get("spark.origin.checkTableforColSize.cols.types"));
49-
filterColName = sparkConf.get("spark.origin.FilterColumn");
50-
filterColType = sparkConf.get("spark.origin.FilterColumnType");
46+
filterColName = Util.getSparkPropOrEmpty(sparkConf, "spark.origin.FilterColumn");
47+
filterColType = Util.getSparkPropOrEmpty(sparkConf, "spark.origin.FilterColumnType");
5148
filterColIndex = Integer.parseInt(sparkConf.get("spark.origin.FilterColumnIndex", "0"));
5249
fieldGuardraillimitMB = Integer.parseInt(sparkConf.get("spark.fieldGuardraillimitMB", "0"));
5350

src/main/scala/datastax/astra/migrate/BaseJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BaseJob extends App {
4848
val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807"))
4949
val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")
5050
val splitSize = Integer.parseInt(Util.getSparkPropOr(sc, "spark.splitSize", "10000"))
51-
51+
5252
protected def exitSpark() = {
5353
spark.stop()
5454
abstractLogger.info("################################################################################################")

src/resources/sparkConf.properties

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@ spark.origin.username some-username
44
spark.origin.password some-secret-password
55
spark.origin.read.consistency.level LOCAL_QUORUM
66
spark.origin.keyspaceTable test.a1
7-
spark.origin.checkTableforColSize false
8-
spark.origin.checkTableforColSize.cols partition-key,clustering-key
9-
spark.origin.checkTableforColSize.cols.types 9,1
10-
spark.origin.FilterColumn test
11-
spark.origin.FilterColumnIndex 2
12-
spark.origin.FilterColumnType 6%16
137

148
spark.target.isAstra true
159
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip
@@ -46,6 +40,18 @@ spark.origin.writeTimeStampFilter false
4640
spark.origin.minWriteTimeStampFilter 0
4741
spark.origin.maxWriteTimeStampFilter 9223372036854775807
4842

43+
################### ONLY USE if needing to get record count of recs greater than 10MB from Origin ######################
44+
#spark.origin.checkTableforColSize false
45+
#spark.origin.checkTableforColSize.cols partition-key,clustering-key
46+
#spark.origin.checkTableforColSize.cols.types 9,1
47+
48+
########################## ONLY USE if needing to filter data from Origin ###############################
49+
#spark.origin.FilterData false
50+
#spark.origin.FilterColumn test
51+
#spark.origin.FilterColumnIndex 2
52+
#spark.origin.FilterColumnType 6%16
53+
#spark.origin.FilterColumnValue test
54+
4955
########################## ONLY USE if SSL clientAuth is enabled on origin Cassandra/DSE ###############################
5056
#spark.origin.trustStore.path
5157
#spark.origin.trustStore.password

0 commit comments

Comments
 (0)