2727
2828import java .util .concurrent .TimeUnit ;
2929import java .util .function .Function ;
30+ import java .util .function .IntUnaryOperator ;
3031
3132/**
3233 * Concurrency limit algorithm that adjusts the limit based on the gradient of change of the current average RTT and
@@ -76,7 +77,7 @@ public static class Builder {
7677 private int maxConcurrency = 200 ;
7778
7879 private double smoothing = 0.2 ;
79- private Function < Integer , Integer > queueSize = concurrency -> 4 ;
80+ private IntUnaryOperator queueSize = concurrency -> 4 ;
8081 private MetricRegistry registry = EmptyMetricRegistry .INSTANCE ;
8182 private int longWindow = 600 ;
8283 private double rttTolerance = 1.5 ;
@@ -127,10 +128,23 @@ public Builder queueSize(int queueSize) {
127128 /**
128129 * Function to dynamically determine the amount the estimated limit can grow while
129130 * latencies remain low as a function of the current limit.
130- * @param queueSize
131+ * @param queueSize the queue size function
131132 * @return Chainable builder
133+ * @deprecated use {@link #queueSizeFunction(IntUnaryOperator)}
132134 */
135+ @ Deprecated
133136 public Builder queueSize (Function <Integer , Integer > queueSize ) {
137+ this .queueSize = queueSize ::apply ;
138+ return this ;
139+ }
140+
141+ /**
142+ * Function to dynamically determine the amount the estimated limit can grow while
143+ * latencies remain low as a function of the current limit.
144+ * @param queueSize the queue size function
145+ * @return Chainable builder
146+ */
147+ public Builder queueSizeFunction (IntUnaryOperator queueSize ) {
134148 this .queueSize = queueSize ;
135149 return this ;
136150 }
@@ -231,7 +245,7 @@ public static Gradient2Limit newDefault() {
231245
232246 private final int minLimit ;
233247
234- private final Function < Integer , Integer > queueSize ;
248+ private final IntUnaryOperator queueSize ;
235249
236250 private final double smoothing ;
237251
@@ -262,10 +276,11 @@ private Gradient2Limit(Builder builder) {
262276
263277 @ Override
264278 public int _update (final long startTime , final long rtt , final int inflight , final boolean didDrop ) {
265- final double queueSize = this .queueSize .apply ((int )this .estimatedLimit );
279+ double estimatedLimit = this .estimatedLimit ;
280+ final double queueSize = this .queueSize .applyAsInt ((int ) estimatedLimit );
266281
267282 this .lastRtt = rtt ;
268- final double shortRtt = (double )rtt ;
283+ final double shortRtt = (double ) rtt ;
269284 final double longRtt = this .longRtt .add (rtt ).doubleValue ();
270285
271286 shortRttSampleListener .addDoubleSample (shortRtt );
@@ -293,7 +308,7 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
293308 newLimit = estimatedLimit * (1 - smoothing ) + newLimit * smoothing ;
294309 newLimit = Math .max (minLimit , Math .min (maxLimit , newLimit ));
295310
296- if ((int )estimatedLimit != newLimit ) {
311+ if ((int ) estimatedLimit != newLimit && LOG . isDebugEnabled () ) {
297312 LOG .debug ("New limit={} shortRtt={} ms longRtt={} ms queueSize={} gradient={}" ,
298313 (int )newLimit ,
299314 getLastRtt (TimeUnit .MICROSECONDS ) / 1000.0 ,
@@ -302,9 +317,9 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
302317 gradient );
303318 }
304319
305- estimatedLimit = newLimit ;
320+ this . estimatedLimit = newLimit ;
306321
307- return (int )estimatedLimit ;
322+ return (int ) newLimit ;
308323 }
309324
310325 public long getLastRtt (TimeUnit units ) {
0 commit comments