Skip to content

Commit 414f64d

Browse files
authored
Merge pull request #211 from kilink/vegas-limit-int-unary-operator
Use IntUnaryOperator / DoubleUnaryOperator in VegasLimit
2 parents 74bb308 + a127eb4 commit 414f64d

File tree

5 files changed

+215
-50
lines changed

5 files changed

+215
-50
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java

Lines changed: 95 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
import com.netflix.concurrency.limits.MetricRegistry;
2020
import com.netflix.concurrency.limits.MetricRegistry.SampleListener;
2121
import com.netflix.concurrency.limits.internal.EmptyMetricRegistry;
22-
import com.netflix.concurrency.limits.internal.Preconditions;
23-
import com.netflix.concurrency.limits.limit.functions.Log10RootFunction;
22+
import com.netflix.concurrency.limits.limit.functions.Log10RootIntFunction;
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;
2625

2726
import java.util.concurrent.ThreadLocalRandom;
2827
import java.util.concurrent.TimeUnit;
28+
import java.util.function.DoubleUnaryOperator;
2929
import java.util.function.Function;
30+
import java.util.function.IntUnaryOperator;
3031

3132
/**
3233
* Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha)
@@ -41,19 +42,19 @@
4142
public class VegasLimit extends AbstractLimit {
4243
private static final Logger LOG = LoggerFactory.getLogger(VegasLimit.class);
4344

44-
private static final Function<Integer, Integer> LOG10 = Log10RootFunction.create(0);
45+
private static final IntUnaryOperator LOG10 = Log10RootIntFunction.create(0);
4546

4647
public static class Builder {
4748
private int initialLimit = 20;
4849
private int maxConcurrency = 1000;
4950
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
5051
private double smoothing = 1.0;
5152

52-
private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());
53-
private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());
54-
private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());
55-
private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());
56-
private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());
53+
private IntUnaryOperator alphaFunc = (limit) -> 3 * LOG10.applyAsInt(limit);
54+
private IntUnaryOperator betaFunc = (limit) -> 6 * LOG10.applyAsInt(limit);
55+
private IntUnaryOperator thresholdFunc = LOG10;
56+
private DoubleUnaryOperator increaseFunc = (limit) -> limit + LOG10.applyAsInt((int) limit);
57+
private DoubleUnaryOperator decreaseFunc = (limit) -> limit - LOG10.applyAsInt((int) limit);
5758
private int probeMultiplier = 30;
5859

5960
private Builder() {
@@ -74,13 +75,31 @@ public Builder alpha(int alpha) {
7475
this.alphaFunc = (ignore) -> alpha;
7576
return this;
7677
}
77-
78+
79+
/**
80+
* @deprecated use {@link #thresholdFunction(IntUnaryOperator)}
81+
*/
82+
@Deprecated
7883
public Builder threshold(Function<Integer, Integer> threshold) {
84+
this.thresholdFunc = threshold::apply;
85+
return this;
86+
}
87+
88+
public Builder thresholdFunction(IntUnaryOperator threshold) {
7989
this.thresholdFunc = threshold;
8090
return this;
8191
}
82-
92+
93+
/**
94+
* @deprecated use {@link #alphaFunction(IntUnaryOperator)}
95+
*/
96+
@Deprecated
8397
public Builder alpha(Function<Integer, Integer> alpha) {
98+
this.alphaFunc = alpha::apply;
99+
return this;
100+
}
101+
102+
public Builder alphaFunction(IntUnaryOperator alpha) {
84103
this.alphaFunc = alpha;
85104
return this;
86105
}
@@ -89,18 +108,45 @@ public Builder beta(int beta) {
89108
this.betaFunc = (ignore) -> beta;
90109
return this;
91110
}
92-
111+
112+
/**
113+
* @deprecated use {@link #betaFunction(IntUnaryOperator)}
114+
*/
115+
@Deprecated
93116
public Builder beta(Function<Integer, Integer> beta) {
117+
this.betaFunc = beta::apply;
118+
return this;
119+
}
120+
121+
public Builder betaFunction(IntUnaryOperator beta) {
94122
this.betaFunc = beta;
95123
return this;
96124
}
97-
125+
126+
/**
127+
* @deprecated use {@link #increaseFunction(DoubleUnaryOperator)}
128+
*/
129+
@Deprecated
98130
public Builder increase(Function<Double, Double> increase) {
131+
this.increaseFunc = increase::apply;
132+
return this;
133+
}
134+
135+
public Builder increaseFunction(DoubleUnaryOperator increase) {
99136
this.increaseFunc = increase;
100137
return this;
101138
}
102-
139+
140+
/**
141+
* @deprecated use {@link #decreaseFunction(DoubleUnaryOperator)}
142+
*/
143+
@Deprecated
103144
public Builder decrease(Function<Double, Double> decrease) {
145+
this.decreaseFunc = decrease::apply;
146+
return this;
147+
}
148+
149+
public Builder decreaseFunction(DoubleUnaryOperator decrease) {
104150
this.decreaseFunc = decrease;
105151
return this;
106152
}
@@ -164,11 +210,11 @@ public static VegasLimit newDefault() {
164210
private final int maxLimit;
165211

166212
private final double smoothing;
167-
private final Function<Integer, Integer> alphaFunc;
168-
private final Function<Integer, Integer> betaFunc;
169-
private final Function<Integer, Integer> thresholdFunc;
170-
private final Function<Double, Double> increaseFunc;
171-
private final Function<Double, Double> decreaseFunc;
213+
private final IntUnaryOperator alphaFunc;
214+
private final IntUnaryOperator betaFunc;
215+
private final IntUnaryOperator thresholdFunc;
216+
private final DoubleUnaryOperator increaseFunc;
217+
private final DoubleUnaryOperator decreaseFunc;
172218
private final SampleListener rttSampleListener;
173219
private final int probeMultiplier;
174220
private int probeCount = 0;
@@ -201,69 +247,77 @@ private boolean shouldProbe() {
201247

202248
@Override
203249
protected int _update(long startTime, long rtt, int inflight, boolean didDrop) {
204-
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
250+
if (rtt <= 0) {
251+
throw new IllegalArgumentException("rtt must be >0 but got " + rtt);
252+
}
205253

206254
probeCount++;
207255
if (shouldProbe()) {
208-
LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
256+
if (LOG.isDebugEnabled()) {
257+
LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
258+
}
209259
resetProbeJitter();
210260
probeCount = 0;
211261
rtt_noload = rtt;
212-
return (int)estimatedLimit;
262+
return (int) estimatedLimit;
213263
}
214-
264+
265+
long rtt_noload = this.rtt_noload;
215266
if (rtt_noload == 0 || rtt < rtt_noload) {
216-
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
217-
rtt_noload = rtt;
218-
return (int)estimatedLimit;
267+
if (LOG.isDebugEnabled()) {
268+
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
269+
}
270+
this.rtt_noload = rtt;
271+
return (int) estimatedLimit;
219272
}
220-
273+
221274
rttSampleListener.addLongSample(rtt_noload);
222275

223-
return updateEstimatedLimit(rtt, inflight, didDrop);
276+
return updateEstimatedLimit(rtt, rtt_noload, inflight, didDrop);
224277
}
225278

226-
private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) {
227-
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
279+
private int updateEstimatedLimit(long rtt, long rtt_noload, int inflight, boolean didDrop) {
280+
double estimatedLimit = this.estimatedLimit;
281+
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double) rtt_noload / rtt));
228282

229283
double newLimit;
230284
// Treat any drop (i.e timeout) as needing to reduce the limit
231285
if (didDrop) {
232-
newLimit = decreaseFunc.apply(estimatedLimit);
286+
newLimit = decreaseFunc.applyAsDouble(estimatedLimit);
233287
// Prevent upward drift if not close to the limit
234288
} else if (inflight * 2 < estimatedLimit) {
235-
return (int)estimatedLimit;
289+
return (int) estimatedLimit;
236290
} else {
237-
int alpha = alphaFunc.apply((int)estimatedLimit);
238-
int beta = betaFunc.apply((int)estimatedLimit);
239-
int threshold = this.thresholdFunc.apply((int)estimatedLimit);
291+
int alpha = alphaFunc.applyAsInt((int) estimatedLimit);
292+
int beta = betaFunc.applyAsInt((int) estimatedLimit);
293+
int threshold = thresholdFunc.applyAsInt((int) estimatedLimit);
240294

241295
// Aggressive increase when no queuing
242296
if (queueSize <= threshold) {
243297
newLimit = estimatedLimit + beta;
244298
// Increase the limit if queue is still manageable
245299
} else if (queueSize < alpha) {
246-
newLimit = increaseFunc.apply(estimatedLimit);
300+
newLimit = increaseFunc.applyAsDouble(estimatedLimit);
247301
// Detecting latency so decrease
248302
} else if (queueSize > beta) {
249-
newLimit = decreaseFunc.apply(estimatedLimit);
303+
newLimit = decreaseFunc.applyAsDouble(estimatedLimit);
250304
// We're within he sweet spot so nothing to do
251305
} else {
252-
return (int)estimatedLimit;
306+
return (int) estimatedLimit;
253307
}
254308
}
255309

256310
newLimit = Math.max(1, Math.min(maxLimit, newLimit));
257311
newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
258-
if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
312+
if ((int) newLimit != (int) estimatedLimit && LOG.isDebugEnabled()) {
259313
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",
260-
(int)newLimit,
314+
(int) newLimit,
261315
TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,
262316
TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,
263317
queueSize);
264318
}
265-
estimatedLimit = newLimit;
266-
return (int)estimatedLimit;
319+
this.estimatedLimit = newLimit;
320+
return (int) newLimit;
267321
}
268322

269323
@Override

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/Log10RootFunction.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,33 @@
1919
import java.util.stream.IntStream;
2020

2121
/**
22-
* Function used by limiters to calculate thredsholds using log10 of the current limit.
22+
* Function used by limiters to calculate thresholds using log10 of the current limit.
2323
* Here we pre-compute the log10 of numbers up to 1000 as an optimization.
24+
*
25+
* @deprecated use {@link Log10RootIntFunction}
2426
*/
27+
@Deprecated
2528
public final class Log10RootFunction implements Function<Integer, Integer> {
2629
static final int[] lookup = new int[1000];
27-
30+
2831
static {
2932
IntStream.range(0, 1000).forEach(i -> lookup[i] = Math.max(1, (int)Math.log10(i)));
3033
}
31-
34+
3235
private static final Log10RootFunction INSTANCE = new Log10RootFunction();
33-
36+
3437
/**
3538
* Create an instance of a function that returns : baseline + sqrt(limit)
36-
*
39+
*
3740
* @param baseline
3841
* @return
3942
*/
4043
public static Function<Integer, Integer> create(int baseline) {
4144
return INSTANCE.andThen(t -> t + baseline);
4245
}
43-
46+
4447
@Override
4548
public Integer apply(Integer t) {
4649
return t < 1000 ? lookup[t] : (int)Math.log10(t);
4750
}
48-
}
51+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.netflix.concurrency.limits.limit.functions;
2+
3+
import java.util.function.IntUnaryOperator;
4+
5+
/**
6+
* Function used by limiters to calculate thresholds using log10 of the current limit.
7+
* Here we pre-compute the log10 of numbers up to 1000 as an optimization.
8+
*/
9+
public final class Log10RootIntFunction implements IntUnaryOperator {
10+
11+
private Log10RootIntFunction() {}
12+
13+
private static final int[] lookup = new int[1000];
14+
15+
static {
16+
for (int i = 0; i < lookup.length; i++) {
17+
lookup[i] = Math.max(1, (int) Math.log10(i));
18+
}
19+
}
20+
21+
private static final Log10RootIntFunction INSTANCE = new Log10RootIntFunction();
22+
23+
/**
24+
* Create an instance of a function that returns : baseline + sqrt(limit)
25+
*
26+
* @param baseline
27+
* @return
28+
*/
29+
public static IntUnaryOperator create(int baseline) {
30+
return baseline == 0 ? INSTANCE : INSTANCE.andThen(t -> t + baseline);
31+
}
32+
33+
@Override
34+
public int applyAsInt(int t) {
35+
return t < 1000 ? lookup[t] : (int) Math.log10(t);
36+
}
37+
}

concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void noChangeIfWithinThresholds() {
5858
@Test
5959
public void decreaseSmoothing() {
6060
VegasLimit limit = VegasLimit.newBuilder()
61-
.decrease(current -> current / 2)
61+
.decreaseFunction(current -> current / 2)
6262
.smoothing(0.5)
6363
.initialLimit(100)
6464
.maxConcurrency(200)
@@ -77,10 +77,33 @@ public void decreaseSmoothing() {
7777
Assert.assertEquals(56, limit.getLimit());
7878
}
7979

80+
@Test
81+
public void decreaseSmoothingDeprecatedBuilderMethod() {
82+
@SuppressWarnings("deprecation")
83+
VegasLimit limit = VegasLimit.newBuilder()
84+
.decrease(current -> current / 2)
85+
.smoothing(0.5)
86+
.initialLimit(100)
87+
.maxConcurrency(200)
88+
.build();
89+
90+
// Pick up first min-rtt
91+
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false);
92+
Assert.assertEquals(100, limit.getLimit());
93+
94+
// First decrease
95+
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
96+
Assert.assertEquals(75, limit.getLimit());
97+
98+
// Second decrease
99+
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
100+
Assert.assertEquals(56, limit.getLimit());
101+
}
102+
80103
@Test
81104
public void decreaseWithoutSmoothing() {
82105
VegasLimit limit = VegasLimit.newBuilder()
83-
.decrease(current -> current / 2)
106+
.decreaseFunction(current -> current / 2)
84107
.initialLimit(100)
85108
.maxConcurrency(200)
86109
.build();
@@ -97,4 +120,26 @@ public void decreaseWithoutSmoothing() {
97120
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
98121
Assert.assertEquals(25, limit.getLimit());
99122
}
123+
124+
@Test
125+
public void decreaseWithoutSmoothingDeprecatedBuilderMethod() {
126+
@SuppressWarnings("deprecation")
127+
VegasLimit limit = VegasLimit.newBuilder()
128+
.decrease(current -> current / 2)
129+
.initialLimit(100)
130+
.maxConcurrency(200)
131+
.build();
132+
133+
// Pick up first min-rtt
134+
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false);
135+
Assert.assertEquals(100, limit.getLimit());
136+
137+
// First decrease
138+
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
139+
Assert.assertEquals(50, limit.getLimit());
140+
141+
// Second decrease
142+
limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false);
143+
Assert.assertEquals(25, limit.getLimit());
144+
}
100145
}

0 commit comments

Comments
 (0)