Skip to content

Commit d848572

Browse files
committed
Added filtering code to CopyJob. Takes a flag then proceeds to find column and value from config file and filter data from origin to target.
1 parent 7e8f61b commit d848572

File tree

6 files changed

+42
-18
lines changed

6 files changed

+42
-18
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>datastax.astra.migrate</groupId>
55
<artifactId>cassandra-data-migrator</artifactId>
6-
<version>2.6</version>
6+
<version>2.7</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/BaseJobSession.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,25 @@ public abstract class BaseJobSession {
4848
protected String astraKeyspaceTable;
4949

5050
protected Boolean hasRandomPartitioner;
51+
protected Boolean filterData;
52+
protected String filterColName;
53+
protected String filterColType;
54+
protected Integer filterColIndex;
55+
protected String filterColValue;
5156

57+
public String getKey(Row sourceRow) {
58+
StringBuffer key = new StringBuffer();
59+
for (int index = 0; index < idColTypes.size(); index++) {
60+
MigrateDataType dataType = idColTypes.get(index);
61+
if (index == 0) {
62+
key.append(getData(dataType, index, sourceRow));
63+
} else {
64+
key.append(" %% " + getData(dataType, index, sourceRow));
65+
}
66+
}
67+
68+
return key.toString();
69+
}
5270
public List<MigrateDataType> getTypes(String types) {
5371
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
5472
for (String type : types.split(",")) {

src/main/java/datastax/astra/migrate/CopyJobSession.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ public class CopyJobSession extends AbstractJobSession {
2222

2323
protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
2424
super(sourceSession, astraSession, sc);
25+
filterData = Boolean.parseBoolean(sc.get("spark.origin.FilterData", "false"));
26+
filterColName = sc.get("spark.origin.FilterColumn");
27+
filterColType = sc.get("spark.origin.FilterColumnType");
28+
filterColIndex = Integer.parseInt(sc.get("spark.origin.FilterColumnIndex", "0"));
29+
filterColValue = sc.get("spark.origin.FilterColumnValue");
2530
}
2631

2732
public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
@@ -52,6 +57,13 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
5257
for (Row sourceRow : resultSet) {
5358
readLimiter.acquire(1);
5459

60+
if (filterData) {
61+
String col = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
62+
if (col.trim().equalsIgnoreCase(filterColValue)) {
63+
logger.warn("Row larger than 10 MB found filtering out: " + getKey(sourceRow));
64+
continue;
65+
}
66+
}
5567
if (writeTimeStampFilter) {
5668
// only process rows greater than writeTimeStampFilter
5769
Long sourceWriteTimeStamp = getLargestWriteTimeStamp(sourceRow);
@@ -92,6 +104,15 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
92104
if (readCounter.incrementAndGet() % printStatsAfter == 0) {
93105
printCounts(false);
94106
}
107+
108+
if (filterData) {
109+
String colValue = (String) getData(new MigrateDataType(filterColType), filterColIndex, sourceRow);
110+
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
111+
logger.warn("Row larger than 10 MB found filtering out: " + getKey(sourceRow));
112+
continue;
113+
}
114+
}
115+
95116
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
96117

97118
// if batch threshold is met, send the writes and clear the batch

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -186,18 +186,4 @@ private String isDifferent(Row sourceRow, Row astraRow) {
186186
return diffData.toString();
187187
}
188188

189-
private String getKey(Row sourceRow) {
190-
StringBuffer key = new StringBuffer();
191-
for (int index = 0; index < idColTypes.size(); index++) {
192-
MigrateDataType dataType = idColTypes.get(index);
193-
if (index == 0) {
194-
key.append(getData(dataType, index, sourceRow));
195-
} else {
196-
key.append(" %% " + getData(dataType, index, sourceRow));
197-
}
198-
}
199-
200-
return key.toString();
201-
}
202-
203189
}

src/main/java/datastax/astra/migrate/OriginCountJobSession.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ public class OriginCountJobSession extends BaseJobSession {
2323
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();
2424
protected Boolean checkTableforColSize;
2525
protected String checkTableforselectCols;
26-
protected String filterColName;
27-
protected String filterColType;
28-
protected Integer filterColIndex;
2926
protected Integer fieldGuardraillimitMB;
3027
protected List<MigrateDataType> checkTableforColSizeTypes = new ArrayList<MigrateDataType>();
3128

src/resources/sparkConf.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ spark.origin.keyspaceTable test.a1
77
spark.origin.checkTableforColSize false
88
spark.origin.checkTableforColSize.cols partition-key,clustering-key
99
spark.origin.checkTableforColSize.cols.types 9,1
10+
spark.origin.FilterData true
1011
spark.origin.FilterColumn test
1112
spark.origin.FilterColumnIndex 2
1213
spark.origin.FilterColumnType 6%16
14+
spark.origin.FilterColumnValue test
1315

1416
spark.target.isAstra true
1517
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip

0 commit comments

Comments
 (0)