Skip to content

Commit 199c00b

Browse files
faizalrub-datastaxmsmygit
authored andcommitted
Exception handler - in-progress
1 parent 8463f45 commit 199c00b

File tree

10 files changed

+383
-8
lines changed

10 files changed

+383
-8
lines changed

src/main/java/datastax/cdm/job/AbstractJobSession.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.datastax.oss.driver.api.core.CqlSession;
44
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
5+
import datastax.cdm.properties.ColumnsKeysTypes;
56
import datastax.cdm.properties.KnownProperties;
67
import org.apache.spark.SparkConf;
78
import org.slf4j.Logger;
@@ -35,6 +36,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
3536
readLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ));
3637
writeLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_WRITE));
3738
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
39+
maxRetriesRowFailure = propertyHelper.getInteger(KnownProperties.MAX_RETRIES_ROW_FAILURE);
40+
41+
tokenRangeExceptionDir = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_DIR);
42+
rowExceptionDir = propertyHelper.getString(KnownProperties.ROW_EXCEPTION_DIR);
43+
exceptionFileName = ColumnsKeysTypes.getOriginKeyspaceTable(propertyHelper);
3844

3945
logger.info("PARAM -- Max Retries: {}", maxRetries);
4046
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());

src/main/java/datastax/cdm/job/BaseJobSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@ public abstract class BaseJobSession {
1919
protected RateLimiter readLimiter;
2020
protected RateLimiter writeLimiter;
2121
protected Integer maxRetries = 10;
22+
protected Integer maxRetriesRowFailure = 2;
2223

2324
protected Integer printStatsAfter = 100000;
2425

26+
protected String tokenRangeExceptionDir;
27+
protected String rowExceptionDir;
28+
protected String exceptionFileName;
29+
2530
protected BaseJobSession(SparkConf sc) {
2631
propertyHelper.initializeSparkConf(sc);
2732
this.cqlHelper = new CqlHelper();

src/main/java/datastax/cdm/job/CopyPKJobSession.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package datastax.cdm.job;
22

33
import com.datastax.oss.driver.api.core.CqlSession;
4+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
45
import com.datastax.oss.driver.api.core.cql.Row;
6+
import com.datastax.oss.driver.api.core.data.UdtValue;
57
import datastax.cdm.data.EnhancedPK;
68
import datastax.cdm.data.PKFactory;
79
import datastax.cdm.data.Record;
@@ -15,6 +17,7 @@
1517
import java.util.ArrayList;
1618
import java.util.List;
1719
import java.util.concurrent.atomic.AtomicLong;
20+
import java.util.stream.IntStream;
1821

1922
public class CopyPKJobSession extends AbstractJobSession {
2023

@@ -25,6 +28,14 @@ public class CopyPKJobSession extends AbstractJobSession {
2528
protected AtomicLong skipCounter = new AtomicLong(0);
2629
protected AtomicLong writeCounter = new AtomicLong(0);
2730

31+
private AtomicLong correctedMissingCounter = new AtomicLong(0);
32+
private AtomicLong correctedMismatchCounter = new AtomicLong(0);
33+
private AtomicLong validCounter = new AtomicLong(0);
34+
private AtomicLong mismatchCounter = new AtomicLong(0);
35+
private AtomicLong skippedCounter = new AtomicLong(0);
36+
private AtomicLong failedRowCounter = new AtomicLong(0);
37+
38+
2839
private final PKFactory pkFactory;
2940
private final List<MigrateDataType> originPKTypes;
3041
private final boolean isCounterTable;
@@ -112,5 +123,136 @@ private EnhancedPK toEnhancedPK(String rowString) {
112123
}
113124
return pkFactory.toEnhancedPK(values, pkFactory.getPKTypes(PKFactory.Side.ORIGIN));
114125
}
126+
/*
127+
// FR: THIS ENTIRE THING NEEDS TO BE MOVED FROM HERE TO DIFFJOBSESSION CLASS
128+
@SuppressWarnings("unchecked")
129+
public void getRowAndDiff(List<SplitPartitions.PKRows> rowsList) {
130+
for (SplitPartitions.PKRows rows : rowsList) {
131+
rows.pkRows.parallelStream().forEach(row -> {
132+
readCounter.incrementAndGet();
133+
EnhancedPK pk = toEnhancedPK(row);
134+
if (null == pk || pk.isError()) {
135+
missingCounter.incrementAndGet();
136+
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
137+
return;
138+
}
139+
int maxAttempts = maxRetriesRowFailure;
140+
Row sourceRow = null;
141+
int diffAttempt = 0;
142+
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
143+
try {
144+
sourceRow = originSelectByPKStatement.getRecord(pk).getOriginRow();
145+
if (sourceRow != null) {
146+
Row astraRow = cqlHelper.getTargetSelectByPKStatement().getRecord(pk).getTargetRow();
147+
diffAttempt++;
148+
diff(sourceRow, astraRow, diffAttempt);
149+
} else {
150+
logger.error("Could not find row with primary-key: {} on source", row);
151+
}
152+
retryCount = maxAttempts;
153+
} catch (Exception e) {
154+
logger.error("Could not find row with primary-key: {} retry# {}", row, retryCount, e);
155+
if (retryCount == maxAttempts) {
156+
logFailedRecordInFile(sourceRow);
157+
}
158+
}
159+
}
160+
});
161+
}
162+
printValidationCounts(true);
163+
}
164+
165+
private void diff(Row sourceRow, Row astraRow, int diffAttempt) {
166+
if (astraRow == null) {
167+
if (diffAttempt == 1) {
168+
missingCounter.incrementAndGet();
169+
logger.info("Missing target row found for key: {}", getKey(sourceRow));
170+
}
171+
targetSession.execute(bindInsert(targetInsertStatement, sourceRow, null));
172+
correctedMissingCounter.incrementAndGet();
173+
logger.info("Inserted missing row in target: {}", getKey(sourceRow));
174+
} else {
175+
String diffData = isDifferent(sourceRow, astraRow);
176+
if (!diffData.isEmpty()) {
177+
if (diffAttempt == 1) {
178+
mismatchCounter.incrementAndGet();
179+
logger.info("Mismatch row found for key: {} Mismatch: {}", getKey(sourceRow), diffData);
180+
}
181+
182+
Record record = new Record(pkFactory.getTargetPK(sourceRow), astraRow, null);
183+
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);
184+
else cqlHelper.getTargetInsertStatement().putRecord(record);
185+
correctedMismatchCounter.incrementAndGet();
186+
logger.info("Updated mismatch row in target: {}", getKey(sourceRow));
187+
} else {
188+
validCounter.incrementAndGet();
189+
}
190+
}
191+
}
192+
193+
private String isDifferent(Row sourceRow, Row astraRow) {
194+
StringBuffer diffData = new StringBuffer();
195+
IntStream.range(0, selectColTypes.size()).parallel().forEach(index -> {
196+
MigrateDataType dataType = selectColTypes.get(index);
197+
Object source = getData(dataType, index, sourceRow);
198+
Object astra = getData(dataType, index, astraRow);
199+
200+
boolean isDiff = dataType.diff(source, astra);
201+
if (isDiff) {
202+
if (dataType.typeClass.equals(UdtValue.class)) {
203+
String sourceUdtContent = ((UdtValue) source).getFormattedContents();
204+
String astraUdtContent = ((UdtValue) astra).getFormattedContents();
205+
if (!sourceUdtContent.equals(astraUdtContent)) {
206+
diffData.append("(Index: " + index + " Origin: " + sourceUdtContent + " Target: "
207+
+ astraUdtContent + ") ");
208+
}
209+
} else {
210+
diffData.append("(Index: " + index + " Origin: " + source + " Target: " + astra + ") ");
211+
}
212+
}
213+
});
214+
215+
return diffData.toString();
216+
}
115217
218+
private void logFailedRecordInFile(Row sourceRow) {
219+
try {
220+
failedRowCounter.getAndIncrement();
221+
Util.FileAppend(rowExceptionDir, exceptionFileName, getKey(sourceRow));
222+
logger.error("Failed to validate row: {} after {} retry.", getKey(sourceRow));
223+
} catch (Exception exp) {
224+
logger.error("Error occurred while writing to key {} to file ", getKey(sourceRow), exp);
225+
}
226+
}
227+
*/
228+
public void printValidationCounts(boolean isFinal) {
229+
String msg = "ThreadID: " + Thread.currentThread().getId();
230+
if (isFinal) {
231+
logger.info(
232+
"################################################################################################");
233+
234+
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
235+
logger.info("{} Mismatch Record Count: {}", msg, mismatchCounter.get());
236+
logger.info("{} Corrected Mismatch Record Count: {}", msg, correctedMismatchCounter.get());
237+
logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
238+
logger.info("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
239+
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
240+
logger.info("{} Failed row Count: {}", msg, failedRowCounter.get());
241+
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
242+
}
243+
244+
logger.debug("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
245+
logger.debug("{} Mismatch Record Count: {}", msg, mismatchCounter.get());
246+
logger.debug("{} Corrected Mismatch Record Count: {}", msg, correctedMismatchCounter.get());
247+
logger.debug("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
248+
logger.debug("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
249+
logger.debug("{} Skipped Record Count: {}", msg, skippedCounter.get());
250+
logger.debug("{} Failed row Count: {}", msg, failedRowCounter.get());
251+
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
252+
253+
if (isFinal) {
254+
logger.info(
255+
"################################################################################################");
256+
}
257+
}
116258
}

src/main/java/datastax/cdm/job/DiffJobSession.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DiffJobSession extends CopyJobSession {
3838
private final AtomicLong correctedMismatchCounter = new AtomicLong(0);
3939
private final AtomicLong validCounter = new AtomicLong(0);
4040
private final AtomicLong skippedCounter = new AtomicLong(0);
41+
private AtomicLong failedRowCounter = new AtomicLong(0);
4142

4243
private final boolean isCounterTable;
4344
private final boolean forceCounterWhenMissing;
@@ -145,16 +146,44 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
145146
logger.error("Error occurred during Attempt#: {}", attempts, e);
146147
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
147148
Thread.currentThread().getId(), min, max, attempts);
149+
if (null != tokenRangeExceptionDir && tokenRangeExceptionDir.trim().length() > 0 && attempts == maxAttempts) {
150+
logFailedPartitionsInFile(min, max);
151+
}
148152
}
149153
}
150154
}
151155

156+
private void logFailedPartitionsInFile(BigInteger min, BigInteger max) {
157+
try {
158+
ExceptionHandler.FileAppend(tokenRangeExceptionDir, exceptionFileName, min + "," + max);
159+
} catch (Exception ee) {
160+
logger.error("Error occurred while writing to token range file min: {} max: {}", min, max, ee);
161+
}
162+
}
163+
164+
private void logFailedRecordInFile(Record record) {
165+
try {
166+
failedRowCounter.getAndIncrement();
167+
ExceptionHandler.FileAppend(rowExceptionDir, exceptionFileName, record.getPk().toString());
168+
logger.error("Failed to validate row: {} after {} retry.", record.getPk());
169+
} catch (Exception exp) {
170+
logger.error("Error occurred while writing to key {} to file ", record.getPk(), exp);
171+
}
172+
}
173+
152174
private void diffAndClear(List<Record> recordsToDiff) {
153175
for (Record record : recordsToDiff) {
154-
try {
155-
diff(record);
156-
} catch (Exception e) {
157-
logger.error("Could not perform diff for key {}: {}", record.getPk(), e);
176+
int maxAttempts = maxRetriesRowFailure; // Is this correct?
177+
for (int attempts = 1; attempts <= maxAttempts; attempts++) {
178+
try {
179+
diff(record, attempts);
180+
attempts = maxAttempts;
181+
} catch (Exception e) {
182+
logger.error("Could not perform diff for Key: {} -- Retry# {}", record.getPk(), attempts, e);
183+
if (null != rowExceptionDir && rowExceptionDir.trim().length() > 0 && attempts == maxAttempts) {
184+
logFailedRecordInFile(record);
185+
}
186+
}
158187
}
159188
}
160189
recordsToDiff.clear();
@@ -173,18 +202,20 @@ public synchronized void printCounts(boolean isFinal) {
173202
logger.info("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
174203
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
175204
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
205+
logger.info("{} Failed row Count: {}", msg, failedRowCounter.get());
176206
if (isFinal) {
177207
logger.info("################################################################################################");
178208
}
179209
}
180210

181-
private void diff(Record record) {
211+
private void diff(Record record, int attempts) {
182212
EnhancedPK originPK = record.getPk();
183213
Row originRow = record.getOriginRow();
184214
Row targetRow = record.getTargetRow();
185215

186216
if (targetRow == null) {
187217
missingCounter.incrementAndGet();
218+
// FR WHY THE RETRY == 1 IS MISSING HERE
188219
logger.error("Missing target row found for key: {}", record.getPk());
189220
if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) {
190221
logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", KnownProperties.AUTOCORRECT_MISSING, KnownProperties.AUTOCORRECT_MISSING_COUNTER, record.getPk());
@@ -204,9 +235,10 @@ private void diff(Record record) {
204235

205236
String diffData = isDifferent(originPK, originRow, targetRow);
206237
if (!diffData.isEmpty()) {
207-
mismatchCounter.incrementAndGet();
208-
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);
209-
238+
if (attempts == 1) {
239+
mismatchCounter.incrementAndGet();
240+
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);
241+
}
210242
if (autoCorrectMismatch) {
211243
writeLimiter.acquire(1);
212244
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package datastax.cdm.job;
2+
3+
import java.io.IOException;
4+
import java.nio.charset.StandardCharsets;
5+
import java.nio.file.Files;
6+
import java.nio.file.Path;
7+
import java.nio.file.Paths;
8+
import java.nio.file.StandardOpenOption;
9+
10+
public class ExceptionHandler {
11+
12+
private static final String NEW_LINE = System.lineSeparator();
13+
14+
private static void appendToFile(Path path, String content)
15+
throws IOException {
16+
// if file not exists, create and write to it
17+
// otherwise append to the end of the file
18+
Files.write(path, content.getBytes(StandardCharsets.UTF_8),
19+
StandardOpenOption.CREATE,
20+
StandardOpenOption.APPEND);
21+
}
22+
23+
public static void FileAppend(String dir, String fileName, String content) throws IOException {
24+
25+
//create directory if not already existing
26+
Files.createDirectories(Paths.get(dir));
27+
Path path = Paths.get(dir + "/" + fileName);
28+
appendToFile(path, content + NEW_LINE);
29+
30+
}
31+
32+
}

0 commit comments

Comments
 (0)