Skip to content

Commit ecba107

Browse files
committed
CDM-67 simplifying rate limit config
1 parent 0a6b78a commit ecba107

File tree

8 files changed

+31
-43
lines changed

8 files changed

+31
-43
lines changed

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,13 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4242
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
4343
}
4444

45-
readLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ));
46-
writeLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_WRITE));
47-
Integer readLimitTarget = propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ_TARGET);
48-
if (readLimitTarget == null || readLimitTarget < 0) { readLimitTarget = propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ);}
49-
readLimiterTarget = RateLimiter.create(readLimitTarget);
45+
originLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
46+
targetLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
5047
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
5148

5249
logger.info("PARAM -- Max Retries: {}", maxRetries);
53-
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
54-
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
55-
logger.info("PARAM -- TargetReadRateLimit: {}", readLimiterTarget.getRate());
50+
logger.info("PARAM -- Origin Rate Limit: {}", originLimiter.getRate());
51+
logger.info("PARAM -- Target Rate Limit: {}", targetLimiter.getRate());
5652

5753
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
5854
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ public abstract class BaseJobSession {
2424
// then do the following to set the values as they are only applicable per JVM
2525
// (hence spark Executor)...
2626
// Rate = Total Throughput (write/read per sec) / Total Executors
27-
protected RateLimiter readLimiter;
28-
protected RateLimiter readLimiterTarget;
29-
protected RateLimiter writeLimiter;
27+
protected RateLimiter originLimiter;
28+
protected RateLimiter targetLimiter;
3029
protected Integer maxRetries = 10;
3130

3231
protected Integer printStatsAfter = 100000;

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
7676
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<>();
7777

7878
for (Row originRow : resultSet) {
79-
readLimiter.acquire(1);
79+
originLimiter.acquire(1);
8080
readCnt++;
8181
if (readCnt % printStatsAfter == 0) {
8282
printCounts(false);
@@ -98,14 +98,13 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
9898
}
9999
}
100100

101-
writeLimiter.acquire(1);
102-
103101
BoundStatement boundUpsert = bind(r);
104102
if (null == boundUpsert) {
105103
skipCnt++; // TODO: this previously skipped, why not errCnt?
106104
continue;
107105
}
108106

107+
targetLimiter.acquire(1);
109108
writeAsync(writeResults, boundUpsert);
110109
unflushedWrites++;
111110

@@ -169,7 +168,7 @@ private void flushAndClearWrites(Collection<CompletionStage<AsyncResultSet>> wri
169168

170169
private BoundStatement bind(Record r) {
171170
if (isCounterTable) {
172-
readLimiterTarget.acquire(1);
171+
targetLimiter.acquire(1);
173172
Record targetRecord = targetSelectByPKStatement.getRecord(r.getPk());
174173
if (null != targetRecord) {
175174
r.setTargetRow(targetRecord.getTargetRow());

src/main/java/com/datastax/cdm/job/CopyPKJobSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
5656
return;
5757
}
5858

59+
originLimiter.acquire(1);
5960
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
6061
if (null == recordFromOrigin) {
6162
missingCounter.incrementAndGet();
@@ -79,7 +80,7 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
7980
}
8081
}
8182

82-
writeLimiter.acquire(1);
83+
targetLimiter.acquire(1);
8384
targetSession.getTargetUpsertStatement().putRecord(record);
8485
writeCounter.incrementAndGet();
8586

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
112112

113113
List<Record> recordsToDiff = new ArrayList<>(fetchSizeInRows);
114114
StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> {
115-
readLimiter.acquire(1);
115+
originLimiter.acquire(1);
116116
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
117117

118118
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
@@ -132,7 +132,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
132132
}
133133
}
134134

135-
readLimiterTarget.acquire(1);
135+
targetLimiter.acquire(1);
136136
CompletionStage<AsyncResultSet> targetResult = targetSelectByPKStatement.getAsyncResult(r.getPk());
137137

138138
if (null==targetResult) {
@@ -203,7 +203,7 @@ private void diff(Record record) {
203203

204204
//correct data
205205
if (autoCorrectMissing) {
206-
writeLimiter.acquire(1);
206+
targetLimiter.acquire(1);
207207
targetSession.getTargetUpsertStatement().putRecord(record);
208208
correctedMissingCounter.incrementAndGet();
209209
logger.error("Inserted missing row in target: {}", record.getPk());
@@ -217,7 +217,7 @@ private void diff(Record record) {
217217
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);
218218

219219
if (autoCorrectMismatch) {
220-
writeLimiter.acquire(1);
220+
targetLimiter.acquire(1);
221221
targetSession.getTargetUpsertStatement().putRecord(record);
222222
correctedMismatchCounter.incrementAndGet();
223223
logger.error("Corrected mismatch row in target: {}", record.getPk());

src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement;
44
import com.datastax.cdm.data.PKFactory;
55
import com.datastax.cdm.data.Record;
6-
import com.datastax.cdm.feature.Featureset;
7-
import com.datastax.cdm.feature.Guardrail;
86
import com.datastax.oss.driver.api.core.CqlSession;
97
import com.datastax.oss.driver.api.core.cql.*;
108
import org.apache.logging.log4j.ThreadContext;
@@ -52,7 +50,7 @@ public void guardrailCheck(BigInteger min, BigInteger max) {
5250
ResultSet resultSet = originSelectByPartitionRangeStatement.execute(originSelectByPartitionRangeStatement.bind(min, max));
5351
String checkString;
5452
for (Row originRow : resultSet) {
55-
readLimiter.acquire(1);
53+
originLimiter.acquire(1);
5654
readCounter.addAndGet(1);
5755

5856
if (readCounter.get() % printStatsAfter == 0) {

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,8 @@ public enum PropertyType {
9999

100100
public static final String PERF_NUM_PARTS = "spark.cdm.perfops.numParts"; // 10000, was spark.splitSize
101101
public static final String PERF_BATCH_SIZE = "spark.cdm.perfops.batchSize"; // 5
102-
public static final String PERF_LIMIT_READ = "spark.cdm.perfops.readRateLimit"; // 20000
103-
public static final String PERF_LIMIT_READ_TARGET = "spark.cdm.perfops.readRateLimitTarget"; // readRateLimit
104-
public static final String PERF_LIMIT_WRITE = "spark.cdm.perfops.writeRateLimit"; // 40000
102+
public static final String PERF_RATELIMIT_ORIGIN = "spark.cdm.perfops.ratelimit.origin"; // 20000
103+
public static final String PERF_RATELIMIT_TARGET = "spark.cdm.perfops.ratelimit.target"; // 40000
105104

106105
public static final String READ_CL = "spark.cdm.perfops.consistency.read";
107106
public static final String WRITE_CL = "spark.cdm.perfops.consistency.write";
@@ -121,11 +120,10 @@ public enum PropertyType {
121120
defaults.put(PERF_NUM_PARTS, "10000");
122121
types.put(PERF_BATCH_SIZE, PropertyType.NUMBER);
123122
defaults.put(PERF_BATCH_SIZE, "5");
124-
types.put(PERF_LIMIT_READ, PropertyType.NUMBER);
125-
defaults.put(PERF_LIMIT_READ, "20000");
126-
types.put(PERF_LIMIT_READ_TARGET, PropertyType.NUMBER);
127-
types.put(PERF_LIMIT_WRITE, PropertyType.NUMBER);
128-
defaults.put(PERF_LIMIT_WRITE, "40000");
123+
types.put(PERF_RATELIMIT_ORIGIN, PropertyType.NUMBER);
124+
defaults.put(PERF_RATELIMIT_ORIGIN, "20000");
125+
types.put(PERF_RATELIMIT_TARGET, PropertyType.NUMBER);
126+
defaults.put(PERF_RATELIMIT_TARGET, "40000");
129127

130128
types.put(READ_CL, PropertyType.STRING);
131129
defaults.put(READ_CL, "LOCAL_QUORUM");

src/resources/sparkConf.properties

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,13 @@ spark.cdm.autocorrect.mismatch false
147147
# at a time so if your partition sizes are larger, this number may be increased.
148148
# If .batchSize would mean that more than 1 partition is often contained in a batch,
149149
# the figure should be reduced. Ideally < 1% of batches have more than 1 partition.
150-
# .readRateLimit : Defaults to 20000. Concurrent number of records that may will be read across
151-
# all parallel threads from Origin. This may be adjusted up (or down), depending on
152-
# the amount of data and the processing capacity of the Origin cluster.
153-
# .readRateLimitTarget : Defaults to readRateLimit. Concurrent number of records that may will be read across
154-
# all parallel threads from Target. This may be adjusted up (or down), depending on
155-
# the amount of data and the processing capacity of the Target cluster.
156-
# .writeRateLimit : Defaults to 40000. Concurrent number of records that may will be written across
157-
# all parallel threads. This may be adjusted up (or down), depending on the amount
158-
# of data and the processing capacity of the Target cluster.
150+
# .ratelimit
151+
# .origin : Defaults to 20000. Concurrent number of operations across all parallel threads
152+
# from Origin. This may be adjusted up (or down), depending on the amount of data
153+
# and the processing capacity of the Origin cluster.
154+
# .target : Defaults to 40000. Concurrent number of operations across all parallel threads
155+
# from Target. This may be adjusted up (or down), depending on the amount of data
156+
# and the processing capacity of the Target cluster.
159157
#
160158
# Other Parameters:
161159
# spark.cdm.perfops
@@ -175,9 +173,8 @@ spark.cdm.autocorrect.mismatch false
175173
#-----------------------------------------------------------------------------------------------------------
176174
spark.cdm.perfops.numParts 10000
177175
spark.cdm.perfops.batchSize 5
178-
spark.cdm.perfops.readRateLimit 20000
179-
#spark.cdm.perfops.readRateLimitTarget 20000
180-
spark.cdm.perfops.writeRateLimit 40000
176+
spark.cdm.perfops.ratelimit.origin 20000
177+
spark.cdm.perfops.ratelimit.target 40000
181178
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
182179
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
183180
#spark.cdm.perfops.printStatsAfter 100000

0 commit comments

Comments
 (0)