Skip to content

Commit 4f8c416

Browse files
committed
CDM-67 adding target read rate limiter
1 parent c7759d2 commit 4f8c416

File tree

6 files changed

+15
-2
lines changed

6 files changed

+15
-2
lines changed

src/main/java/com/datastax/cdm/job/AbstractJobSession.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,15 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
4444

4545
readLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ));
4646
writeLimiter = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_LIMIT_WRITE));
47+
Integer readLimitTarget = propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ_TARGET);
48+
if (readLimitTarget == null || readLimitTarget < 0) { readLimitTarget = propertyHelper.getInteger(KnownProperties.PERF_LIMIT_READ);}
49+
readLimiterTarget = RateLimiter.create(readLimitTarget);
4750
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);
4851

4952
logger.info("PARAM -- Max Retries: {}", maxRetries);
5053
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
5154
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
55+
logger.info("PARAM -- TargetReadRateLimit: {}", readLimiterTarget.getRate());
5256

5357
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
5458
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);

src/main/java/com/datastax/cdm/job/BaseJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public abstract class BaseJobSession {
2525
// (hence spark Executor)...
2626
// Rate = Total Throughput (write/read per sec) / Total Executors
2727
protected RateLimiter readLimiter;
28+
protected RateLimiter readLimiterTarget;
2829
protected RateLimiter writeLimiter;
2930
protected Integer maxRetries = 10;
3031

src/main/java/com/datastax/cdm/job/CopyJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ private void flushAndClearWrites(Collection<CompletionStage<AsyncResultSet>> wri
169169

170170
private BoundStatement bind(Record r) {
171171
if (isCounterTable) {
172+
readLimiterTarget.acquire(1);
172173
Record targetRecord = targetSelectByPKStatement.getRecord(r.getPk());
173174
if (null != targetRecord) {
174175
r.setTargetRow(targetRecord.getTargetRow());

src/main/java/com/datastax/cdm/job/DiffJobSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
132132
}
133133
}
134134

135+
readLimiterTarget.acquire(1);
135136
CompletionStage<AsyncResultSet> targetResult = targetSelectByPKStatement.getAsyncResult(r.getPk());
136137

137138
if (null==targetResult) {

src/main/java/com/datastax/cdm/properties/KnownProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public enum PropertyType {
100100
public static final String PERF_NUM_PARTS = "spark.cdm.perfops.numParts"; // 10000, was spark.splitSize
101101
public static final String PERF_BATCH_SIZE = "spark.cdm.perfops.batchSize"; // 5
102102
public static final String PERF_LIMIT_READ = "spark.cdm.perfops.readRateLimit"; // 20000
103+
public static final String PERF_LIMIT_READ_TARGET = "spark.cdm.perfops.readRateLimit.target"; // readRateLimit
103104
public static final String PERF_LIMIT_WRITE = "spark.cdm.perfops.writeRateLimit"; // 40000
104105

105106
public static final String READ_CL = "spark.cdm.perfops.consistency.read";
@@ -122,6 +123,7 @@ public enum PropertyType {
122123
defaults.put(PERF_BATCH_SIZE, "5");
123124
types.put(PERF_LIMIT_READ, PropertyType.NUMBER);
124125
defaults.put(PERF_LIMIT_READ, "20000");
126+
types.put(PERF_LIMIT_READ_TARGET, PropertyType.NUMBER);
125127
types.put(PERF_LIMIT_WRITE, PropertyType.NUMBER);
126128
defaults.put(PERF_LIMIT_WRITE, "40000");
127129

src/resources/sparkConf.properties

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,11 @@ spark.cdm.autocorrect.mismatch false
148148
# If .batchSize would mean that more than 1 partition is often contained in a batch,
149149
# the figure should be reduced. Ideally < 1% of batches have more than 1 partition.
150150
# .readRateLimit : Defaults to 20000. Concurrent number of records that may will be read across
151-
# all parallel threads. This may be adjusted up (or down), depending on the amount
152-
# of data and the processing capacity of the Origin cluster.
151+
# all parallel threads from Origin. This may be adjusted up (or down), depending on
152+
# the amount of data and the processing capacity of the Origin cluster.
153+
# .readRateLimitTarget : Defaults to readRateLimit. Concurrent number of records that may will be read across
154+
# all parallel threads from Target. This may be adjusted up (or down), depending on
155+
# the amount of data and the processing capacity of the Target cluster.
153156
# .writeRateLimit : Defaults to 40000. Concurrent number of records that may will be written across
154157
# all parallel threads. This may be adjusted up (or down), depending on the amount
155158
# of data and the processing capacity of the Target cluster.
@@ -173,6 +176,7 @@ spark.cdm.autocorrect.mismatch false
173176
spark.cdm.perfops.numParts 10000
174177
spark.cdm.perfops.batchSize 5
175178
spark.cdm.perfops.readRateLimit 20000
179+
#spark.cdm.perfops.readRateLimitTarget 20000
176180
spark.cdm.perfops.writeRateLimit 40000
177181
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
178182
#spark.cdm.perfops.consistency.write LOCAL_QUORUM

0 commit comments

Comments
 (0)