Skip to content

Commit f1f063a

Browse files
authored
Merge pull request #21 from Ankitp1342/feature/autocorrect-rows
Auto-correct (enable/disable) missing or mismatched data
2 parents 953f639 + b9560d3 commit f1f063a

File tree

4 files changed

+39
-19
lines changed

4 files changed

+39
-19
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.datastax.spark.example</groupId>
55
<artifactId>migrate</artifactId>
6-
<version>0.13</version>
6+
<version>0.15</version>
77
<packaging>jar</packaging>
88

99
<properties>

src/main/java/datastax/astra/migrate/AbstractJobSession.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,16 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
9494
}
9595
}
9696

97-
minWriteTimeStampFilter = new Long(
98-
sparkConf.get("spark.migrate.source.minWriteTimeStampFilter", "0"));
99-
maxWriteTimeStampFilter = new Long(
100-
sparkConf.get("spark.migrate.source.maxWriteTimeStampFilter", "" + Long.MAX_VALUE));
97+
String minWriteTimeStampFilterStr =
98+
sparkConf.get("spark.migrate.source.minWriteTimeStampFilter", "0");
99+
if (null != minWriteTimeStampFilterStr && minWriteTimeStampFilterStr.trim().length() > 1) {
100+
minWriteTimeStampFilter = Long.parseLong(minWriteTimeStampFilterStr);
101+
}
102+
String maxWriteTimeStampFilterStr =
103+
sparkConf.get("spark.migrate.source.maxWriteTimeStampFilter", "0");
104+
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
105+
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
106+
}
101107

102108
logger.info(" DEFAULT -- Write Batch Size: " + batchSize);
103109
logger.info(" DEFAULT -- Source Keyspace Table: " + sourceKeyspaceTable);

src/main/java/datastax/astra/migrate/DiffJobSession.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ public class DiffJobSession extends CopyJobSession {
2929
private static DiffJobSession diffJobSession;
3030

3131
private AtomicLong readCounter = new AtomicLong(0);
32-
private AtomicLong diffCounter = new AtomicLong(0);
32+
private AtomicLong mismatchCounter = new AtomicLong(0);
3333
private AtomicLong missingCounter = new AtomicLong(0);
3434
private AtomicLong correctedMissingCounter = new AtomicLong(0);
35-
private AtomicLong validDiffCounter = new AtomicLong(0);
35+
private AtomicLong correctedMismatchCounter = new AtomicLong(0);
36+
private AtomicLong validCounter = new AtomicLong(0);
3637
private AtomicLong skippedCounter = new AtomicLong(0);
3738

3839
protected List<MigrateDataType> selectColTypes = new ArrayList<MigrateDataType>();
39-
protected Boolean isDiffOnly = false;
40-
40+
protected Boolean autoCorrectMissing = false;
41+
protected Boolean autoCorrectMismatch = false;
4142

4243
public static DiffJobSession getInstance(CqlSession sourceSession, CqlSession astraSession, SparkConf sparkConf) {
4344
if (diffJobSession == null) {
@@ -55,7 +56,8 @@ private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkC
5556
super(sourceSession, astraSession, sparkConf);
5657

5758
selectColTypes = getTypes(sparkConf.get("spark.migrate.diff.select.types"));
58-
isDiffOnly = Boolean.parseBoolean(sparkConf.get("spark.migrate.isDiffOnly", "false"));
59+
autoCorrectMissing = Boolean.parseBoolean(sparkConf.get("spark.migrate.destination.autocorrect.missing", "false"));
60+
autoCorrectMismatch = Boolean.parseBoolean(sparkConf.get("spark.migrate.destination.autocorrect.mismatch", "false"));
5961
}
6062

6163
public void getDataAndDiff(BigInteger min, BigInteger max) {
@@ -105,14 +107,16 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
105107
public void printCounts(String finalStr) {
106108
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Record Count: "
107109
+ readCounter.get());
108-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Differences Count: "
109-
+ diffCounter.get());
110-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Missing Count: "
110+
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Mismatch Count: "
111+
+ mismatchCounter.get());
112+
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Mismatch Count: "
113+
+ correctedMismatchCounter.get());
114+
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Missing Count: "
111115
+ missingCounter.get());
112-
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Corrected Missing Count: "
116+
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Corrected Missing Count: "
113117
+ correctedMissingCounter.get());
114118
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Valid Count: "
115-
+ validDiffCounter.get());
119+
+ validCounter.get());
116120
logger.info("TreadID: " + Thread.currentThread().getId() + " " + finalStr + " Read Skipped Count: "
117121
+ skippedCounter.get());
118122
}
@@ -123,22 +127,30 @@ private void diff(Row sourceRow, Row astraRow) {
123127
logger.error("Data is missing in Astra: " + getKey(sourceRow));
124128
//correct data
125129

126-
if (!isDiffOnly) {
130+
if (autoCorrectMissing) {
127131
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
128132
correctedMissingCounter.incrementAndGet();
129133
logger.error("Corrected missing data in Astra: " + getKey(sourceRow));
130134
}
135+
131136
return;
132137
}
133138

134139
String diffData = isDifferent(sourceRow, astraRow);
135140
if (!diffData.isEmpty()) {
136-
diffCounter.incrementAndGet();
137-
logger.error("Data difference found - Key: " + getKey(sourceRow) + " Data: " + diffData);
141+
mismatchCounter.incrementAndGet();
142+
logger.error("Data mismatch found - Key: " + getKey(sourceRow) + " Data: " + diffData);
143+
144+
if (autoCorrectMismatch) {
145+
astraSession.execute(bindInsert(astraInsertStatement, sourceRow));
146+
correctedMismatchCounter.incrementAndGet();
147+
logger.error("Corrected mismatch data in Astra: " + getKey(sourceRow));
148+
}
149+
138150
return;
139151
}
140152

141-
validDiffCounter.incrementAndGet();
153+
validCounter.incrementAndGet();
142154
}
143155

144156
private String isDifferent(Row sourceRow, Row astraRow) {

src/resources/sparkConf.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ spark.migrate.destination.username client-id
1111
spark.migrate.destination.password client-secret
1212
spark.migrate.destination.read.consistency.level LOCAL_QUORUM
1313
spark.migrate.destination.keyspaceTable test.a2
14+
spark.migrate.destination.autocorrect.missing false
15+
spark.migrate.destination.autocorrect.mismatch false
1416

1517
spark.migrate.maxRetries 10
1618
spark.migrate.readRateLimit 40000

0 commit comments

Comments
 (0)