Skip to content

Commit 9f73d82

Browse files
committed
Feat: add rate limiters
1 parent b9c5558 commit 9f73d82

14 files changed

+446
-100
lines changed

extended/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
<artifactId>guava</artifactId>
3838
<version>25.1-jre</version>
3939
</dependency>
40+
<dependency>
41+
<groupId>com.github.vladimir-bukhtoyarov</groupId>
42+
<artifactId>bucket4j-core</artifactId>
43+
<version>${bucket4jVersion}</version>
44+
</dependency>
4045
<!-- test dependencies -->
4146
<dependency>
4247
<groupId>junit</groupId>
@@ -130,5 +135,6 @@
130135
<maven.compiler.source>${java.version}</maven.compiler.source>
131136
<maven.compiler.target>${java.version}</maven.compiler.target>
132137
<slf4jVersion>1.7.7</slf4jVersion>
138+
<bucket4jVersion>4.4.1</bucket4jVersion>
133139
</properties>
134140
</project>
Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package io.kubernetes.client.extended.workqueue;
22

3-
import java.time.Duration;
4-
import java.util.Map;
5-
import java.util.concurrent.ConcurrentHashMap;
3+
import io.kubernetes.client.extended.workqueue.ratelimiter.DefaultControllerRateLimiter;
4+
import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter;
65
import java.util.concurrent.ExecutorService;
76

87
/** The default rate limiting queue implementation. */
98
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
109
implements RateLimitingQueue<T> {
1110

12-
private RateLimiter rateLimiter;
11+
private RateLimiter<T> rateLimiter;
1312

1413
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
1514
super(waitingWorker);
16-
this.rateLimiter = new ExponentialRateLimiter();
15+
this.rateLimiter = new DefaultControllerRateLimiter<>();
1716
}
1817

19-
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
18+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter<T> rateLimiter) {
2019
super(waitingWorker);
2120
this.rateLimiter = rateLimiter;
2221
}
@@ -27,54 +26,12 @@ public int numRequeues(T item) {
2726
}
2827

2928
@Override
30-
public void forget(Object item) {
29+
public void forget(T item) {
3130
rateLimiter.forget(item);
3231
}
3332

3433
@Override
3534
public void addRateLimited(T item) {
3635
super.addAfter(item, rateLimiter.when(item));
3736
}
38-
39-
public static class ExponentialRateLimiter implements RateLimiter {
40-
41-
Duration baseDelay;
42-
Duration maxDelay;
43-
44-
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
45-
46-
public ExponentialRateLimiter() {
47-
this.baseDelay = Duration.ofMillis(5);
48-
this.maxDelay = Duration.ofSeconds(1000);
49-
}
50-
51-
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
52-
this.baseDelay = baseDelay;
53-
this.maxDelay = maxDelay;
54-
}
55-
56-
@Override
57-
public void forget(Object item) {
58-
failures.remove(item);
59-
}
60-
61-
@Override
62-
public int numRequeues(Object item) {
63-
return failures.get(item);
64-
}
65-
66-
@Override
67-
public Duration when(Object item) {
68-
Integer exp = failures.getOrDefault(item, 0);
69-
failures.put(item, exp + 1);
70-
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
71-
if (backoff > Long.MAX_VALUE) {
72-
return maxDelay;
73-
}
74-
if (backoff > maxDelay.toNanos()) {
75-
return maxDelay;
76-
}
77-
return Duration.ofNanos(Double.valueOf(backoff).longValue());
78-
}
79-
}
8037
}

extended/src/main/java/io/kubernetes/client/extended/workqueue/RateLimiter.java

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import io.github.bucket4j.*;
4+
import java.time.Duration;
5+
6+
/** A light-weight token bucket implementation for RateLimiter. */
7+
public class BucketRateLimiter<T> implements RateLimiter<T> {
8+
9+
private Bucket bucket;
10+
private long tokensInQueue;
11+
private long tokensGeneratedInPeriod;
12+
private Duration period;
13+
14+
/**
15+
* @param capacity Capacity is the maximum number of tokens can be consumed.
16+
* @param tokensGeneratedInPeriod Tokens generated in period.
17+
* @param period Period that generating specific number of tokens.
18+
*/
19+
public BucketRateLimiter(long capacity, long tokensGeneratedInPeriod, Duration period) {
20+
Bandwidth bandwidth =
21+
Bandwidth.classic(capacity, Refill.greedy(tokensGeneratedInPeriod, period));
22+
23+
this.bucket = Bucket4j.builder().addLimit(bandwidth).build();
24+
this.tokensInQueue = 0;
25+
this.tokensGeneratedInPeriod = tokensGeneratedInPeriod;
26+
this.period = period;
27+
}
28+
29+
@Override
30+
public synchronized Duration when(T item) {
31+
tokensInQueue++;
32+
33+
long consumedTokens = bucket.tryConsumeAsMuchAsPossible(tokensInQueue);
34+
if (tokensInQueue - consumedTokens == 0) {
35+
tokensInQueue = 0;
36+
return Duration.ZERO;
37+
}
38+
39+
tokensInQueue = tokensInQueue - consumedTokens;
40+
41+
return durationFromTokens(tokensInQueue, tokensGeneratedInPeriod, period);
42+
}
43+
44+
@Override
45+
public void forget(T item) {}
46+
47+
@Override
48+
public int numRequeues(T item) {
49+
return 0;
50+
}
51+
52+
private Duration durationFromTokens(
53+
long tokensNeedToBeConsumed, long tokensGeneratedInPeriod, Duration period) {
54+
return period.dividedBy(tokensGeneratedInPeriod).multipliedBy(tokensNeedToBeConsumed);
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
6+
/**
7+
* DefaultControllerRateLimiter is a default rate limiter for workqueue. It has both overall and
8+
* per-item rate limiting. The overall is a token bucket and the per-item is exponential
9+
*/
10+
public class DefaultControllerRateLimiter<T> implements RateLimiter<T> {
11+
12+
private RateLimiter<T> internalRateLimiter;
13+
14+
public DefaultControllerRateLimiter() {
15+
this.internalRateLimiter =
16+
new MaxOfRateLimiter<>(
17+
Arrays.asList(
18+
new ItemExponentialFailureRateLimiter<>(
19+
Duration.ofMillis(5), Duration.ofSeconds(1000)),
20+
new BucketRateLimiter<>(100, 10, Duration.ofMinutes(1))));
21+
}
22+
23+
@Override
24+
public Duration when(T item) {
25+
return internalRateLimiter.when(item);
26+
}
27+
28+
@Override
29+
public void forget(T item) {
30+
internalRateLimiter.forget(item);
31+
}
32+
33+
@Override
34+
public int numRequeues(T item) {
35+
return internalRateLimiter.numRequeues(item);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
/**
8+
* ItemExponentialFailureRateLimiter does a simple baseDelay*10<sup>num-failures</sup> limit dealing
9+
* with max failures and expiration are up to the caller
10+
*/
11+
public class ItemExponentialFailureRateLimiter<T> implements RateLimiter<T> {
12+
13+
private Duration baseDelay;
14+
private Duration maxDelay;
15+
16+
private Map<T, Integer> failures;
17+
18+
public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) {
19+
this.baseDelay = baseDelay;
20+
this.maxDelay = maxDelay;
21+
22+
failures = new HashMap<>();
23+
}
24+
25+
@Override
26+
public synchronized Duration when(T item) {
27+
int exp = failures.getOrDefault(item, 0);
28+
failures.put(item, exp + 1);
29+
30+
double backOff = baseDelay.toNanos() * Math.pow(2, exp);
31+
if (backOff > Long.MAX_VALUE) {
32+
return maxDelay;
33+
}
34+
35+
if (backOff > maxDelay.toNanos()) {
36+
return maxDelay;
37+
}
38+
39+
return Duration.ofNanos((long) backOff);
40+
}
41+
42+
@Override
43+
public synchronized void forget(T item) {
44+
failures.remove(item);
45+
}
46+
47+
@Override
48+
public synchronized int numRequeues(T item) {
49+
return failures.getOrDefault(item, 0);
50+
}
51+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
/**
8+
* ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry
9+
* after that
10+
*/
11+
public class ItemFastSlowRateLimiter<T> implements RateLimiter<T> {
12+
13+
private Map<T, Integer> failures;
14+
15+
private Duration fastDelay;
16+
private Duration slowDelay;
17+
private int maxFastAttempts;
18+
19+
public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) {
20+
this.fastDelay = fastDelay;
21+
this.slowDelay = slowDelay;
22+
this.maxFastAttempts = maxFastAttempts;
23+
24+
failures = new HashMap<>();
25+
}
26+
27+
@Override
28+
public synchronized Duration when(T item) {
29+
int attempts = failures.getOrDefault(item, 0);
30+
failures.put(item, attempts + 1);
31+
32+
if (attempts + 1 <= maxFastAttempts) {
33+
return fastDelay;
34+
}
35+
36+
return slowDelay;
37+
}
38+
39+
@Override
40+
public synchronized void forget(T item) {
41+
failures.remove(item);
42+
}
43+
44+
@Override
45+
public synchronized int numRequeues(T item) {
46+
return failures.getOrDefault(item, 0);
47+
}
48+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
7+
/**
8+
* MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a
9+
* token bucket limiter, the burst could be apparently exceeded in cases where particular items were
10+
* separately delayed a longer time.
11+
*/
12+
public class MaxOfRateLimiter<T> implements RateLimiter<T> {
13+
private List<RateLimiter<T>> rateLimiters;
14+
15+
public MaxOfRateLimiter(List<RateLimiter<T>> rateLimiters) {
16+
this.rateLimiters = rateLimiters;
17+
}
18+
19+
@SafeVarargs
20+
@SuppressWarnings("varargs")
21+
public MaxOfRateLimiter(RateLimiter<T>... rateLimiters) {
22+
this(Arrays.asList(rateLimiters));
23+
}
24+
25+
@Override
26+
public Duration when(T item) {
27+
Duration max = Duration.ZERO;
28+
for (RateLimiter<T> r : rateLimiters) {
29+
Duration current = r.when(item);
30+
if (current.compareTo(max) > 0) {
31+
max = current;
32+
}
33+
}
34+
35+
return max;
36+
}
37+
38+
@Override
39+
public void forget(T item) {
40+
rateLimiters.forEach(r -> r.forget(item));
41+
}
42+
43+
@Override
44+
public int numRequeues(T item) {
45+
int max = 0;
46+
for (RateLimiter<T> r : rateLimiters) {
47+
int current = r.numRequeues(item);
48+
if (current > max) {
49+
max = current;
50+
}
51+
}
52+
53+
return max;
54+
}
55+
}

0 commit comments

Comments
 (0)