Skip to content

Commit f6747ff

Browse files
faizalrub-datastaxmsmygit
authored andcommitted
in-progress
1 parent 199c00b commit f6747ff

File tree

8 files changed

+8
-251
lines changed

8 files changed

+8
-251
lines changed

src/main/java/datastax/cdm/job/AbstractJobSession.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
3636
readLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ));
3737
writeLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_WRITE));
3838
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
39-
maxRetriesRowFailure = propertyHelper.getInteger(KnownProperties.MAX_RETRIES_ROW_FAILURE);
4039

4140
tokenRangeExceptionDir = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_DIR);
42-
rowExceptionDir = propertyHelper.getString(KnownProperties.ROW_EXCEPTION_DIR);
4341
exceptionFileName = ColumnsKeysTypes.getOriginKeyspaceTable(propertyHelper);
4442

4543
logger.info("PARAM -- Max Retries: {}", maxRetries);

src/main/java/datastax/cdm/job/BaseJobSession.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@ public abstract class BaseJobSession {
1919
protected RateLimiter readLimiter;
2020
protected RateLimiter writeLimiter;
2121
protected Integer maxRetries = 10;
22-
protected Integer maxRetriesRowFailure = 2;
2322

2423
protected Integer printStatsAfter = 100000;
2524

2625
protected String tokenRangeExceptionDir;
27-
protected String rowExceptionDir;
2826
protected String exceptionFileName;
2927

3028
protected BaseJobSession(SparkConf sc) {
Lines changed: 0 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package datastax.cdm.job;
22

33
import com.datastax.oss.driver.api.core.CqlSession;
4-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
54
import com.datastax.oss.driver.api.core.cql.Row;
6-
import com.datastax.oss.driver.api.core.data.UdtValue;
75
import datastax.cdm.data.EnhancedPK;
86
import datastax.cdm.data.PKFactory;
97
import datastax.cdm.data.Record;
@@ -17,7 +15,6 @@
1715
import java.util.ArrayList;
1816
import java.util.List;
1917
import java.util.concurrent.atomic.AtomicLong;
20-
import java.util.stream.IntStream;
2118

2219
public class CopyPKJobSession extends AbstractJobSession {
2320

@@ -28,14 +25,6 @@ public class CopyPKJobSession extends AbstractJobSession {
2825
protected AtomicLong skipCounter = new AtomicLong(0);
2926
protected AtomicLong writeCounter = new AtomicLong(0);
3027

31-
private AtomicLong correctedMissingCounter = new AtomicLong(0);
32-
private AtomicLong correctedMismatchCounter = new AtomicLong(0);
33-
private AtomicLong validCounter = new AtomicLong(0);
34-
private AtomicLong mismatchCounter = new AtomicLong(0);
35-
private AtomicLong skippedCounter = new AtomicLong(0);
36-
private AtomicLong failedRowCounter = new AtomicLong(0);
37-
38-
3928
private final PKFactory pkFactory;
4029
private final List<MigrateDataType> originPKTypes;
4130
private final boolean isCounterTable;
@@ -123,136 +112,5 @@ private EnhancedPK toEnhancedPK(String rowString) {
123112
}
124113
return pkFactory.toEnhancedPK(values, pkFactory.getPKTypes(PKFactory.Side.ORIGIN));
125114
}
126-
/*
127-
// FR: THIS ENTIRE THING NEEDS TO BE MOVED FROM HERE TO DIFFJOBSESSION CLASS
128-
@SuppressWarnings("unchecked")
129-
public void getRowAndDiff(List<SplitPartitions.PKRows> rowsList) {
130-
for (SplitPartitions.PKRows rows : rowsList) {
131-
rows.pkRows.parallelStream().forEach(row -> {
132-
readCounter.incrementAndGet();
133-
EnhancedPK pk = toEnhancedPK(row);
134-
if (null == pk || pk.isError()) {
135-
missingCounter.incrementAndGet();
136-
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
137-
return;
138-
}
139-
int maxAttempts = maxRetriesRowFailure;
140-
Row sourceRow = null;
141-
int diffAttempt = 0;
142-
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
143-
try {
144-
sourceRow = originSelectByPKStatement.getRecord(pk).getOriginRow();
145-
if (sourceRow != null) {
146-
Row astraRow = cqlHelper.getTargetSelectByPKStatement().getRecord(pk).getTargetRow();
147-
diffAttempt++;
148-
diff(sourceRow, astraRow, diffAttempt);
149-
} else {
150-
logger.error("Could not find row with primary-key: {} on source", row);
151-
}
152-
retryCount = maxAttempts;
153-
} catch (Exception e) {
154-
logger.error("Could not find row with primary-key: {} retry# {}", row, retryCount, e);
155-
if (retryCount == maxAttempts) {
156-
logFailedRecordInFile(sourceRow);
157-
}
158-
}
159-
}
160-
});
161-
}
162-
printValidationCounts(true);
163-
}
164-
165-
private void diff(Row sourceRow, Row astraRow, int diffAttempt) {
166-
if (astraRow == null) {
167-
if (diffAttempt == 1) {
168-
missingCounter.incrementAndGet();
169-
logger.info("Missing target row found for key: {}", getKey(sourceRow));
170-
}
171-
targetSession.execute(bindInsert(targetInsertStatement, sourceRow, null));
172-
correctedMissingCounter.incrementAndGet();
173-
logger.info("Inserted missing row in target: {}", getKey(sourceRow));
174-
} else {
175-
String diffData = isDifferent(sourceRow, astraRow);
176-
if (!diffData.isEmpty()) {
177-
if (diffAttempt == 1) {
178-
mismatchCounter.incrementAndGet();
179-
logger.info("Mismatch row found for key: {} Mismatch: {}", getKey(sourceRow), diffData);
180-
}
181-
182-
Record record = new Record(pkFactory.getTargetPK(sourceRow), astraRow, null);
183-
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);
184-
else cqlHelper.getTargetInsertStatement().putRecord(record);
185-
correctedMismatchCounter.incrementAndGet();
186-
logger.info("Updated mismatch row in target: {}", getKey(sourceRow));
187-
} else {
188-
validCounter.incrementAndGet();
189-
}
190-
}
191-
}
192-
193-
private String isDifferent(Row sourceRow, Row astraRow) {
194-
StringBuffer diffData = new StringBuffer();
195-
IntStream.range(0, selectColTypes.size()).parallel().forEach(index -> {
196-
MigrateDataType dataType = selectColTypes.get(index);
197-
Object source = getData(dataType, index, sourceRow);
198-
Object astra = getData(dataType, index, astraRow);
199-
200-
boolean isDiff = dataType.diff(source, astra);
201-
if (isDiff) {
202-
if (dataType.typeClass.equals(UdtValue.class)) {
203-
String sourceUdtContent = ((UdtValue) source).getFormattedContents();
204-
String astraUdtContent = ((UdtValue) astra).getFormattedContents();
205-
if (!sourceUdtContent.equals(astraUdtContent)) {
206-
diffData.append("(Index: " + index + " Origin: " + sourceUdtContent + " Target: "
207-
+ astraUdtContent + ") ");
208-
}
209-
} else {
210-
diffData.append("(Index: " + index + " Origin: " + source + " Target: " + astra + ") ");
211-
}
212-
}
213-
});
214-
215-
return diffData.toString();
216-
}
217115

218-
private void logFailedRecordInFile(Row sourceRow) {
219-
try {
220-
failedRowCounter.getAndIncrement();
221-
Util.FileAppend(rowExceptionDir, exceptionFileName, getKey(sourceRow));
222-
logger.error("Failed to validate row: {} after {} retry.", getKey(sourceRow));
223-
} catch (Exception exp) {
224-
logger.error("Error occurred while writing to key {} to file ", getKey(sourceRow), exp);
225-
}
226-
}
227-
*/
228-
public void printValidationCounts(boolean isFinal) {
229-
String msg = "ThreadID: " + Thread.currentThread().getId();
230-
if (isFinal) {
231-
logger.info(
232-
"################################################################################################");
233-
234-
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
235-
logger.info("{} Mismatch Record Count: {}", msg, mismatchCounter.get());
236-
logger.info("{} Corrected Mismatch Record Count: {}", msg, correctedMismatchCounter.get());
237-
logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
238-
logger.info("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
239-
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
240-
logger.info("{} Failed row Count: {}", msg, failedRowCounter.get());
241-
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
242-
}
243-
244-
logger.debug("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
245-
logger.debug("{} Mismatch Record Count: {}", msg, mismatchCounter.get());
246-
logger.debug("{} Corrected Mismatch Record Count: {}", msg, correctedMismatchCounter.get());
247-
logger.debug("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
248-
logger.debug("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
249-
logger.debug("{} Skipped Record Count: {}", msg, skippedCounter.get());
250-
logger.debug("{} Failed row Count: {}", msg, failedRowCounter.get());
251-
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
252-
253-
if (isFinal) {
254-
logger.info(
255-
"################################################################################################");
256-
}
257-
}
258116
}

src/main/java/datastax/cdm/job/DiffJobSession.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class DiffJobSession extends CopyJobSession {
3838
private final AtomicLong correctedMismatchCounter = new AtomicLong(0);
3939
private final AtomicLong validCounter = new AtomicLong(0);
4040
private final AtomicLong skippedCounter = new AtomicLong(0);
41-
private AtomicLong failedRowCounter = new AtomicLong(0);
4241

4342
private final boolean isCounterTable;
4443
private final boolean forceCounterWhenMissing;
@@ -161,29 +160,12 @@ private void logFailedPartitionsInFile(BigInteger min, BigInteger max) {
161160
}
162161
}
163162

164-
private void logFailedRecordInFile(Record record) {
165-
try {
166-
failedRowCounter.getAndIncrement();
167-
ExceptionHandler.FileAppend(rowExceptionDir, exceptionFileName, record.getPk().toString());
168-
logger.error("Failed to validate row: {} after {} retry.", record.getPk());
169-
} catch (Exception exp) {
170-
logger.error("Error occurred while writing to key {} to file ", record.getPk(), exp);
171-
}
172-
}
173-
174163
private void diffAndClear(List<Record> recordsToDiff) {
175164
for (Record record : recordsToDiff) {
176-
int maxAttempts = maxRetriesRowFailure; // Is this correct?
177-
for (int attempts = 1; attempts <= maxAttempts; attempts++) {
178-
try {
179-
diff(record, attempts);
180-
attempts = maxAttempts;
181-
} catch (Exception e) {
182-
logger.error("Could not perform diff for Key: {} -- Retry# {}", record.getPk(), attempts, e);
183-
if (null != rowExceptionDir && rowExceptionDir.trim().length() > 0 && attempts == maxAttempts) {
184-
logFailedRecordInFile(record);
185-
}
186-
}
165+
try {
166+
diff(record);
167+
} catch (Exception e) {
168+
logger.error("Could not perform diff for key {}: {}", record.getPk(), e);
187169
}
188170
}
189171
recordsToDiff.clear();
@@ -202,20 +184,18 @@ public synchronized void printCounts(boolean isFinal) {
202184
logger.info("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get());
203185
logger.info("{} Valid Record Count: {}", msg, validCounter.get());
204186
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
205-
logger.info("{} Failed row Count: {}", msg, failedRowCounter.get());
206187
if (isFinal) {
207188
logger.info("################################################################################################");
208189
}
209190
}
210191

211-
private void diff(Record record, int attempts) {
192+
private void diff(Record record) {
212193
EnhancedPK originPK = record.getPk();
213194
Row originRow = record.getOriginRow();
214195
Row targetRow = record.getTargetRow();
215196

216197
if (targetRow == null) {
217198
missingCounter.incrementAndGet();
218-
// FR WHY THE RETRY == 1 IS MISSING HERE
219199
logger.error("Missing target row found for key: {}", record.getPk());
220200
if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) {
221201
logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", KnownProperties.AUTOCORRECT_MISSING, KnownProperties.AUTOCORRECT_MISSING_COUNTER, record.getPk());
@@ -235,10 +215,9 @@ private void diff(Record record, int attempts) {
235215

236216
String diffData = isDifferent(originPK, originRow, targetRow);
237217
if (!diffData.isEmpty()) {
238-
if (attempts == 1) {
239-
mismatchCounter.incrementAndGet();
240-
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);
241-
}
218+
mismatchCounter.incrementAndGet();
219+
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);
220+
242221
if (autoCorrectMismatch) {
243222
writeLimiter.acquire(1);
244223
if (isCounterTable) cqlHelper.getTargetUpdateStatement().putRecord(record);

src/main/java/datastax/cdm/job/SplitPartitions.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -203,43 +203,4 @@ public static List<Partition> getFailedSubPartitionsFromFile(int splitSize, Stri
203203
return partitions;
204204
}
205205

206-
public static List<PKRows> getFailedRowPartsFromFile(int splitSize, long rowFailureFileSizeLimit, String failedRowsFile) throws IOException {
207-
logger.info("ThreadID: {} Splitting rows in file: {} using a split-size of {}"
208-
, Thread.currentThread().getId(), failedRowsFile, splitSize);
209-
210-
long bytesSize = Files.size(Paths.get(failedRowsFile));
211-
212-
if(bytesSize > rowFailureFileSizeLimit) {
213-
throw new RuntimeException("Row failure file size exceeds permissible limit of " + rowFailureFileSizeLimit + " bytes. Actual file size is " + bytesSize);
214-
}
215-
216-
String renameFile = failedRowsFile+"_bkp";
217-
File file = new File(failedRowsFile);
218-
File rename = new File(renameFile);
219-
if(rename.exists()) {
220-
rename.delete();
221-
}
222-
boolean flag = file.renameTo(rename);
223-
if (flag == true) {
224-
logger.info("File Successfully Renamed to : "+renameFile);
225-
}
226-
else {
227-
logger.info("Operation Failed to rename file : "+failedRowsFile);
228-
}
229-
230-
List<String> pkRows = new ArrayList<String>();
231-
BufferedReader reader = getfileReader(renameFile);
232-
String pkRow = null;
233-
while ((pkRow = reader.readLine()) != null) {
234-
if (pkRow.startsWith("#")) {
235-
continue;
236-
}
237-
pkRows.add(pkRow);
238-
}
239-
int partSize = pkRows.size() / splitSize;
240-
if (partSize == 0) {
241-
partSize = pkRows.size();
242-
}
243-
return batches(pkRows, partSize).map(l -> (new PKRows(l))).collect(Collectors.toList());
244-
}
245206
}

src/main/java/datastax/cdm/properties/KnownProperties.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,19 +173,10 @@ public enum PropertyType {
173173
// Error handling
174174
//==========================================================================
175175
public static final String TOKEN_RANGE_EXCEPTION_DIR = "spark.tokenRange.exceptionDir"; // file:///aaa/bbb/
176-
public static final String ROW_EXCEPTION_DIR = "spark.row.exceptionDir"; // file:///aaa/bbb/
177176
public static final String TOKEN_RANGE_EXCEPTION_FILE = "spark.input.partitionFile"; // file:///aaa/bbb/partitions.csv
178-
public static final String ROW_EXCEPTION_FILE = "spark.input.failedRowsFile"; // file:///aaa/bbb/failedRows.csv
179-
public static final String ROW_FAILURE_FILE_SIZE_LIMIT = "spark.rowfailure.filesize.limit"; // 200000000
180-
public static final String MAX_RETRIES_ROW_FAILURE = "spark.cdm.perfops.rowerror.limit";
181177
static {
182178
types.put(TOKEN_RANGE_EXCEPTION_DIR, PropertyType.STRING);
183-
types.put(ROW_EXCEPTION_DIR, PropertyType.STRING);
184179
types.put(TOKEN_RANGE_EXCEPTION_FILE, PropertyType.STRING);
185-
types.put(ROW_EXCEPTION_FILE, PropertyType.STRING);
186-
types.put(ROW_FAILURE_FILE_SIZE_LIMIT, PropertyType.NUMBER);
187-
types.put(MAX_RETRIES_ROW_FAILURE, PropertyType.NUMBER);
188-
defaults.put(ROW_FAILURE_FILE_SIZE_LIMIT, "200000000");
189180
}
190181
//==========================================================================
191182
// Guardrails and Transformations

src/main/scala/datastax/cdm/job/BaseJob.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ class BaseJob extends App {
5454
val numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
5555

5656
val tokenRangeFile = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_FILE)
57-
val failedRowsFile = propertyHelper.getString(KnownProperties.ROW_EXCEPTION_FILE)
58-
val rowFailureFileSizeLimit = propertyHelper.getLong(KnownProperties.ROW_FAILURE_FILE_SIZE_LIMIT)
5957

6058
protected def exitSpark() = {
6159
spark.stop()

src/main/scala/datastax/cdm/job/DiffDataFailedRowsFromFile.scala

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)