Skip to content

Commit 525dc8a

Browse files
authored
Removed functionality related to retry (#272)
1 parent 3a3bde3 commit 525dc8a

File tree

8 files changed

+160
-251
lines changed

8 files changed

+160
-251
lines changed

src/main/java/com/datastax/cdm/feature/Guardrail.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private Map<String,Integer> check(Map<String,Integer> currentChecks, int targetI
100100
int colSize = targetTable.byteCount(targetIndex, targetValue);
101101
if (logTrace) logger.trace("Column {} at targetIndex {} has size {} bytes", targetTable.getColumnNames(false).get(targetIndex), targetIndex, colSize);
102102
if (colSize > colSizeInKB * BASE_FACTOR) {
103-
if (null==currentChecks) currentChecks = new HashMap();
103+
if (null==currentChecks) currentChecks = new HashMap<String,Integer>();
104104
currentChecks.put(targetTable.getColumnNames(false).get(targetIndex), colSize);
105105
}
106106
return currentChecks;

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
6767

6868
rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
6969
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
70-
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
7170
trackRun = propertyHelper.getBoolean(KnownProperties.TRACK_RUN);
7271

73-
logger.info("PARAM -- Max Retries: {}", maxRetries);
7472
logger.info("PARAM -- Partition file input: {}", partitionFileInput);
7573
logger.info("PARAM -- Partition file output: {}", partitionFileOutput);
7674
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public abstract class BaseJobSession {
4646
protected Map<Featureset, Feature> featureMap;
4747
protected RateLimiter rateLimiterOrigin;
4848
protected RateLimiter rateLimiterTarget;
49-
protected Integer maxRetries = 10;
5049

5150
protected BaseJobSession(SparkConf sc) {
5251
propertyHelper.initializeSparkConf(sc);

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 65 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -70,95 +70,86 @@ public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts,
7070
trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.MIGRATE);
7171
}
7272

73-
public void getDataAndInsert(BigInteger min, BigInteger max) {
73+
private void getDataAndInsert(BigInteger min, BigInteger max) {
7474
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
7575
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
7676
if (trackRun)
7777
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED);
7878

7979
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
80-
boolean done = false;
81-
int maxAttempts = maxRetries + 1;
8280
String guardrailCheck;
83-
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
84-
jobCounter.threadReset();
85-
86-
try {
87-
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession
88-
.getOriginSelectByPartitionRangeStatement();
89-
targetUpsertStatement = this.targetSession.getTargetUpsertStatement();
90-
targetSelectByPKStatement = this.targetSession.getTargetSelectByPKStatement();
91-
ResultSet resultSet = originSelectByPartitionRangeStatement
92-
.execute(originSelectByPartitionRangeStatement.bind(min, max));
93-
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<>();
94-
95-
for (Row originRow : resultSet) {
96-
rateLimiterOrigin.acquire(1);
97-
jobCounter.threadIncrement(JobCounter.CounterType.READ);
98-
99-
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
100-
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
101-
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
102-
continue;
103-
}
104-
105-
for (Record r : pkFactory.toValidRecordList(record)) {
106-
if (guardrailEnabled) {
107-
guardrailCheck = guardrailFeature.guardrailChecks(r);
108-
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
109-
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
110-
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
111-
continue;
112-
}
113-
}
81+
jobCounter.threadReset();
82+
83+
try {
84+
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession
85+
.getOriginSelectByPartitionRangeStatement();
86+
targetUpsertStatement = this.targetSession.getTargetUpsertStatement();
87+
targetSelectByPKStatement = this.targetSession.getTargetSelectByPKStatement();
88+
ResultSet resultSet = originSelectByPartitionRangeStatement
89+
.execute(originSelectByPartitionRangeStatement.bind(min, max));
90+
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<>();
91+
92+
for (Row originRow : resultSet) {
93+
rateLimiterOrigin.acquire(1);
94+
jobCounter.threadIncrement(JobCounter.CounterType.READ);
95+
96+
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
97+
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
98+
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
99+
continue;
100+
}
114101

115-
BoundStatement boundUpsert = bind(r);
116-
if (null == boundUpsert) {
117-
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); // TODO: this previously
118-
// skipped, why not errCnt?
102+
for (Record r : pkFactory.toValidRecordList(record)) {
103+
if (guardrailEnabled) {
104+
guardrailCheck = guardrailFeature.guardrailChecks(r);
105+
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
106+
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
107+
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
119108
continue;
120109
}
110+
}
121111

122-
rateLimiterTarget.acquire(1);
123-
batch = writeAsync(batch, writeResults, boundUpsert);
124-
jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED);
125-
126-
if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) {
127-
flushAndClearWrites(batch, writeResults);
128-
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
129-
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
130-
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
131-
}
112+
BoundStatement boundUpsert = bind(r);
113+
if (null == boundUpsert) {
114+
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); // TODO: this previously
115+
// skipped, why not errCnt?
116+
continue;
132117
}
133-
}
134118

135-
flushAndClearWrites(batch, writeResults);
136-
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
137-
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
138-
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
139-
done = true;
140-
if (trackRun)
141-
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
142-
143-
} catch (Exception e) {
144-
if (attempts == maxAttempts) {
145-
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
146-
jobCounter.getCount(JobCounter.CounterType.READ)
147-
- jobCounter.getCount(JobCounter.CounterType.WRITE)
148-
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
149-
if (trackRun)
150-
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
151-
else
152-
logPartitionsInFile(partitionFileOutput, min, max);
119+
rateLimiterTarget.acquire(1);
120+
batch = writeAsync(batch, writeResults, boundUpsert);
121+
jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED);
122+
123+
if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) {
124+
flushAndClearWrites(batch, writeResults);
125+
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
126+
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
127+
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
128+
}
153129
}
154-
logger.error("Error occurred during Attempt#: {}", attempts, e);
155-
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
156-
Thread.currentThread().getId(), min, max, attempts);
157-
logger.error("Error stats " + jobCounter.getThreadCounters(false));
158-
} finally {
159-
jobCounter.globalIncrement();
160-
printCounts(false);
161130
}
131+
132+
flushAndClearWrites(batch, writeResults);
133+
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
134+
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
135+
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
136+
if (trackRun)
137+
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
138+
139+
} catch (Exception e) {
140+
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
141+
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE)
142+
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
143+
if (trackRun)
144+
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
145+
else
146+
logPartitionsInFile(partitionFileOutput, min, max);
147+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
148+
Thread.currentThread().getId(), min, max);
149+
logger.error("Error stats " + jobCounter.getThreadCounters(false));
150+
} finally {
151+
jobCounter.globalIncrement();
152+
printCounts(false);
162153
}
163154
}
164155

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 68 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -125,91 +125,84 @@ public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts,
125125
trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.DIFF_DATA);
126126
}
127127

128-
public void getDataAndDiff(BigInteger min, BigInteger max) {
128+
private void getDataAndDiff(BigInteger min, BigInteger max) {
129129
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
130130
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
131131
if (trackRun)
132132
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED);
133133

134-
boolean done = false;
135134
AtomicBoolean hasDiff = new AtomicBoolean(false);
136-
int maxAttempts = maxRetries + 1;
137-
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
138-
try {
139-
jobCounter.threadReset();
140-
141-
PKFactory pkFactory = originSession.getPKFactory();
142-
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = originSession
143-
.getOriginSelectByPartitionRangeStatement();
144-
ResultSet resultSet = originSelectByPartitionRangeStatement
145-
.execute(originSelectByPartitionRangeStatement.bind(min, max));
146-
TargetSelectByPKStatement targetSelectByPKStatement = targetSession.getTargetSelectByPKStatement();
147-
Integer fetchSizeInRows = originSession.getCqlTable().getFetchSizeInRows();
148-
149-
List<Record> recordsToDiff = new ArrayList<>(fetchSizeInRows);
150-
StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> {
151-
rateLimiterOrigin.acquire(1);
152-
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
153-
jobCounter.threadIncrement(JobCounter.CounterType.READ);
154-
155-
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
156-
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
157-
} else {
158-
for (Record r : pkFactory.toValidRecordList(record)) {
159-
160-
if (guardrailEnabled) {
161-
String guardrailCheck = guardrailFeature.guardrailChecks(r);
162-
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
163-
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
164-
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
165-
continue;
135+
try {
136+
jobCounter.threadReset();
137+
138+
PKFactory pkFactory = originSession.getPKFactory();
139+
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = originSession
140+
.getOriginSelectByPartitionRangeStatement();
141+
ResultSet resultSet = originSelectByPartitionRangeStatement
142+
.execute(originSelectByPartitionRangeStatement.bind(min, max));
143+
TargetSelectByPKStatement targetSelectByPKStatement = targetSession.getTargetSelectByPKStatement();
144+
Integer fetchSizeInRows = originSession.getCqlTable().getFetchSizeInRows();
145+
146+
List<Record> recordsToDiff = new ArrayList<>(fetchSizeInRows);
147+
StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> {
148+
rateLimiterOrigin.acquire(1);
149+
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
150+
jobCounter.threadIncrement(JobCounter.CounterType.READ);
151+
152+
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
153+
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
154+
} else {
155+
for (Record r : pkFactory.toValidRecordList(record)) {
156+
157+
if (guardrailEnabled) {
158+
String guardrailCheck = guardrailFeature.guardrailChecks(r);
159+
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
160+
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
161+
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
162+
continue;
163+
}
164+
}
165+
166+
rateLimiterTarget.acquire(1);
167+
CompletionStage<AsyncResultSet> targetResult = targetSelectByPKStatement
168+
.getAsyncResult(r.getPk());
169+
170+
if (null == targetResult) {
171+
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
172+
} else {
173+
r.setAsyncTargetRow(targetResult);
174+
recordsToDiff.add(r);
175+
if (recordsToDiff.size() > fetchSizeInRows) {
176+
if (diffAndClear(recordsToDiff)) {
177+
hasDiff.set(true);
166178
}
167179
}
180+
} // targetRecord!=null
181+
} // recordSet iterator
182+
} // shouldFilterRecord
183+
});
184+
if (diffAndClear(recordsToDiff)) {
185+
hasDiff.set(true);
186+
}
168187

169-
rateLimiterTarget.acquire(1);
170-
CompletionStage<AsyncResultSet> targetResult = targetSelectByPKStatement
171-
.getAsyncResult(r.getPk());
172-
173-
if (null == targetResult) {
174-
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
175-
} else {
176-
r.setAsyncTargetRow(targetResult);
177-
recordsToDiff.add(r);
178-
if (recordsToDiff.size() > fetchSizeInRows) {
179-
if (diffAndClear(recordsToDiff)) {
180-
hasDiff.set(true);
181-
}
182-
}
183-
} // targetRecord!=null
184-
} // recordSet iterator
185-
} // shouldFilterRecord
186-
});
187-
if (diffAndClear(recordsToDiff)) {
188-
hasDiff.set(true);
189-
}
190-
done = true;
191-
192-
if (hasDiff.get()) {
193-
if (trackRun)
194-
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF);
195-
else if (appendPartitionOnDiff)
196-
logPartitionsInFile(partitionFileOutput, min, max);
197-
} else if (trackRun) {
198-
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
199-
}
200-
} catch (Exception e) {
201-
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
202-
Thread.currentThread().getId(), min, max, attempts, e);
203-
if (attempts == maxAttempts) {
204-
if (trackRun)
205-
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
206-
else
207-
logPartitionsInFile(partitionFileOutput, min, max);
208-
}
209-
} finally {
210-
jobCounter.globalIncrement();
211-
printCounts(false);
188+
if (hasDiff.get()) {
189+
if (trackRun)
190+
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF);
191+
else if (appendPartitionOnDiff)
192+
logPartitionsInFile(partitionFileOutput, min, max);
193+
} else if (trackRun) {
194+
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
212195
}
196+
} catch (Exception e) {
197+
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
198+
Thread.currentThread().getId(), min, max, e);
199+
if (trackRun)
200+
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
201+
else
202+
logPartitionsInFile(partitionFileOutput, min, max);
203+
} finally {
204+
jobCounter.globalIncrement();
205+
printCounts(false);
213206
}
214207
}
215208

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ public enum PropertyType {
119119
public static final String READ_CL = "spark.cdm.perfops.consistency.read";
120120
public static final String WRITE_CL = "spark.cdm.perfops.consistency.write";
121121
public static final String PERF_FETCH_SIZE = "spark.cdm.perfops.fetchSizeInRows";
122-
public static final String MAX_RETRIES = "spark.cdm.perfops.errorLimit";
123122
public static final String PRINT_STATS_AFTER = "spark.cdm.perfops.printStatsAfter";
124123
public static final String PRINT_STATS_PER_PART = "spark.cdm.perfops.printStatsPerPart";
125124

@@ -154,8 +153,6 @@ public enum PropertyType {
154153
defaults.put(PRINT_STATS_PER_PART, "false");
155154
types.put(PERF_FETCH_SIZE, PropertyType.NUMBER);
156155
defaults.put(PERF_FETCH_SIZE, "1000");
157-
types.put(MAX_RETRIES, PropertyType.NUMBER);
158-
defaults.put(MAX_RETRIES, "0");
159156
}
160157

161158
// ==========================================================================

src/resources/cdm-detailed.properties

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ spark.cdm.trackRun.previousRunId 0
193193
#spark.cdm.tokenrange.partitionFile.appendOnDiff false
194194

195195
#===========================================================================================================
196-
# Performance and Operations Parameters affecting throughput, error handling, and similar concerns.
196+
# Performance and Operations Parameters affecting throughput and similar concerns.
197197
#
198198
# Recommended Parameters:
199199
# spark.cdm.perfops
@@ -228,11 +228,8 @@ spark.cdm.trackRun.previousRunId 0
228228
# entry will be made.
229229
# .printStatsPerPart : Default is false. Print statistics for each part after it is processed.
230230
# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the
231-
# frequency of flushes to Target.
232-
# .errorLimit : Default is 0. Controls how many errors a thread may encounter during Migrate
233-
# and DiffData operations before failing. It is recommended to set this to a non-
234-
# zero value only when not doing a mutation-type operation, e.g. when running
235-
# DiffData without .autocorrect.
231+
# frequency of flushes to Target. A larger value will reduce the number of reads
232+
# and writes, but will increase the memory requirements.
236233
#-----------------------------------------------------------------------------------------------------------
237234
spark.cdm.perfops.numParts 5000
238235
spark.cdm.perfops.batchSize 5
@@ -429,4 +426,3 @@ spark.cdm.perfops.ratelimit.target 20000
429426
#spark.cdm.connect.target.tls.keyStore.path
430427
#spark.cdm.connect.target.tls.keyStore.password
431428
#spark.cdm.connect.target.tls.enabledAlgorithms TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
432-

0 commit comments

Comments
 (0)