Skip to content

Commit 316845b

Browse files
committed
CDM-67 renaming variable for increased clarity per review comment
1 parent 1ccb70a commit 316845b

File tree

6 files changed

+16
-16
lines changed

6 files changed

+16
-16
lines changed

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4242
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
4343
}
4444

45-
originLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
46-
targetLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
45+
rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
46+
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
4747
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
4848

4949
logger.info("PARAM -- Max Retries: {}", maxRetries);
50-
logger.info("PARAM -- Origin Rate Limit: {}", originLimiter.getRate());
51-
logger.info("PARAM -- Target Rate Limit: {}", targetLimiter.getRate());
50+
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
51+
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());
5252

5353
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
5454
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public abstract class BaseJobSession {
2424
// then do the following to set the values as they are only applicable per JVM
2525
// (hence spark Executor)...
2626
// Rate = Total Throughput (write/read per sec) / Total Executors
27-
protected RateLimiter originLimiter;
28-
protected RateLimiter targetLimiter;
27+
protected RateLimiter rateLimiterOrigin;
28+
protected RateLimiter rateLimiterTarget;
2929
protected Integer maxRetries = 10;
3030

3131
protected Integer printStatsAfter = 100000;

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
7373
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<>();
7474

7575
for (Row originRow : resultSet) {
76-
originLimiter.acquire(1);
76+
rateLimiterOrigin.acquire(1);
7777
readCnt++;
7878
if (readCnt % printStatsAfter == 0) {
7979
printCounts(false);
@@ -101,7 +101,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
101101
continue;
102102
}
103103

104-
targetLimiter.acquire(1);
104+
rateLimiterTarget.acquire(1);
105105
batch = writeAsync(batch, writeResults, boundUpsert);
106106
unflushedWrites++;
107107

@@ -165,7 +165,7 @@ private void flushAndClearWrites(BatchStatement batch, Collection<CompletionStag
165165

166166
private BoundStatement bind(Record r) {
167167
if (isCounterTable) {
168-
targetLimiter.acquire(1);
168+
rateLimiterTarget.acquire(1);
169169
Record targetRecord = targetSelectByPKStatement.getRecord(r.getPk());
170170
if (null != targetRecord) {
171171
r.setTargetRow(targetRecord.getTargetRow());

src/main/java/com/datastax/cdm/job/CopyPKJobSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
5656
return;
5757
}
5858

59-
originLimiter.acquire(1);
59+
rateLimiterOrigin.acquire(1);
6060
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
6161
if (null == recordFromOrigin) {
6262
missingCounter.incrementAndGet();
@@ -80,7 +80,7 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
8080
}
8181
}
8282

83-
targetLimiter.acquire(1);
83+
rateLimiterTarget.acquire(1);
8484
targetSession.getTargetUpsertStatement().putRecord(record);
8585
writeCounter.incrementAndGet();
8686

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
112112

113113
List<Record> recordsToDiff = new ArrayList<>(fetchSizeInRows);
114114
StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> {
115-
originLimiter.acquire(1);
115+
rateLimiterOrigin.acquire(1);
116116
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
117117

118118
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
@@ -132,7 +132,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
132132
}
133133
}
134134

135-
targetLimiter.acquire(1);
135+
rateLimiterTarget.acquire(1);
136136
CompletionStage<AsyncResultSet> targetResult = targetSelectByPKStatement.getAsyncResult(r.getPk());
137137

138138
if (null==targetResult) {
@@ -203,7 +203,7 @@ private void diff(Record record) {
203203

204204
//correct data
205205
if (autoCorrectMissing) {
206-
targetLimiter.acquire(1);
206+
rateLimiterTarget.acquire(1);
207207
targetSession.getTargetUpsertStatement().putRecord(record);
208208
correctedMissingCounter.incrementAndGet();
209209
logger.error("Inserted missing row in target: {}", record.getPk());
@@ -217,7 +217,7 @@ private void diff(Record record) {
217217
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);
218218

219219
if (autoCorrectMismatch) {
220-
targetLimiter.acquire(1);
220+
rateLimiterTarget.acquire(1);
221221
targetSession.getTargetUpsertStatement().putRecord(record);
222222
correctedMismatchCounter.incrementAndGet();
223223
logger.error("Corrected mismatch row in target: {}", record.getPk());

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void guardrailCheck(BigInteger min, BigInteger max) {
5050
ResultSet resultSet = originSelectByPartitionRangeStatement.execute(originSelectByPartitionRangeStatement.bind(min, max));
5151
String checkString;
5252
for (Row originRow : resultSet) {
53-
originLimiter.acquire(1);
53+
rateLimiterOrigin.acquire(1);
5454
readCounter.addAndGet(1);
5555

5656
if (readCounter.get() % printStatsAfter == 0) {

0 commit comments

Comments
 (0)