Skip to content

Commit 89b802e

Browse files
authored
Merge pull request #624 from cizezsy/feat/workqueue
Add workqueue support
2 parents 3f4b222 + 4afdc8d commit 89b802e

14 files changed

+551
-100
lines changed

extended/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
<artifactId>guava</artifactId>
3838
<version>25.1-jre</version>
3939
</dependency>
40+
<dependency>
41+
<groupId>com.github.vladimir-bukhtoyarov</groupId>
42+
<artifactId>bucket4j-core</artifactId>
43+
<version>${bucket4jVersion}</version>
44+
</dependency>
4045
<!-- test dependencies -->
4146
<dependency>
4247
<groupId>junit</groupId>
@@ -130,5 +135,6 @@
130135
<maven.compiler.source>${java.version}</maven.compiler.source>
131136
<maven.compiler.target>${java.version}</maven.compiler.target>
132137
<slf4jVersion>1.7.7</slf4jVersion>
138+
<bucket4jVersion>4.4.1</bucket4jVersion>
133139
</properties>
134140
</project>
Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package io.kubernetes.client.extended.workqueue;
22

3-
import java.time.Duration;
4-
import java.util.Map;
5-
import java.util.concurrent.ConcurrentHashMap;
3+
import io.kubernetes.client.extended.workqueue.ratelimiter.DefaultControllerRateLimiter;
4+
import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter;
65
import java.util.concurrent.ExecutorService;
76

87
/** The default rate limiting queue implementation. */
98
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>
109
implements RateLimitingQueue<T> {
1110

12-
private RateLimiter rateLimiter;
11+
private RateLimiter<T> rateLimiter;
1312

1413
public DefaultRateLimitingQueue(ExecutorService waitingWorker) {
1514
super(waitingWorker);
16-
this.rateLimiter = new ExponentialRateLimiter();
15+
this.rateLimiter = new DefaultControllerRateLimiter<>();
1716
}
1817

19-
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter rateLimiter) {
18+
public DefaultRateLimitingQueue(ExecutorService waitingWorker, RateLimiter<T> rateLimiter) {
2019
super(waitingWorker);
2120
this.rateLimiter = rateLimiter;
2221
}
@@ -27,54 +26,12 @@ public int numRequeues(T item) {
2726
}
2827

2928
@Override
30-
public void forget(Object item) {
29+
public void forget(T item) {
3130
rateLimiter.forget(item);
3231
}
3332

3433
@Override
3534
public void addRateLimited(T item) {
3635
super.addAfter(item, rateLimiter.when(item));
3736
}
38-
39-
public static class ExponentialRateLimiter implements RateLimiter {
40-
41-
Duration baseDelay;
42-
Duration maxDelay;
43-
44-
private Map<Object, Integer> failures = new ConcurrentHashMap<>();
45-
46-
public ExponentialRateLimiter() {
47-
this.baseDelay = Duration.ofMillis(5);
48-
this.maxDelay = Duration.ofSeconds(1000);
49-
}
50-
51-
public ExponentialRateLimiter(Duration baseDelay, Duration maxDelay) {
52-
this.baseDelay = baseDelay;
53-
this.maxDelay = maxDelay;
54-
}
55-
56-
@Override
57-
public void forget(Object item) {
58-
failures.remove(item);
59-
}
60-
61-
@Override
62-
public int numRequeues(Object item) {
63-
return failures.get(item);
64-
}
65-
66-
@Override
67-
public Duration when(Object item) {
68-
Integer exp = failures.getOrDefault(item, 0);
69-
failures.put(item, exp + 1);
70-
double backoff = baseDelay.toNanos() * Math.pow(2, exp);
71-
if (backoff > Long.MAX_VALUE) {
72-
return maxDelay;
73-
}
74-
if (backoff > maxDelay.toNanos()) {
75-
return maxDelay;
76-
}
77-
return Duration.ofNanos(Double.valueOf(backoff).longValue());
78-
}
79-
}
8037
}

extended/src/main/java/io/kubernetes/client/extended/workqueue/RateLimiter.java

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import io.github.bucket4j.Bandwidth;
4+
import io.github.bucket4j.Bucket;
5+
import io.github.bucket4j.Bucket4j;
6+
import io.github.bucket4j.Refill;
7+
import io.github.bucket4j.local.SynchronizationStrategy;
8+
import java.time.Duration;
9+
import java.util.concurrent.ScheduledFuture;
10+
import java.util.concurrent.ScheduledThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
12+
13+
/** A light-weight token bucket implementation for RateLimiter. */
14+
public class BucketRateLimiter<T> implements RateLimiter<T> {
15+
private Bucket bucket;
16+
17+
/**
18+
* @param capacity Capacity is the maximum number of tokens can be consumed.
19+
* @param tokensGeneratedInPeriod Tokens generated in period.
20+
* @param period Period that generating specific number of tokens.
21+
*/
22+
public BucketRateLimiter(long capacity, long tokensGeneratedInPeriod, Duration period) {
23+
Bandwidth bandwidth =
24+
Bandwidth.classic(capacity, Refill.greedy(tokensGeneratedInPeriod, period));
25+
this.bucket =
26+
Bucket4j.builder()
27+
.addLimit(bandwidth)
28+
.withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED)
29+
.build();
30+
}
31+
32+
@Override
33+
public Duration when(T item) {
34+
DelayGetter delayGetter = new DelayGetter();
35+
bucket.asAsyncScheduler().consume(1, delayGetter).complete(null);
36+
return delayGetter.getDelay();
37+
}
38+
39+
@Override
40+
public void forget(T item) {}
41+
42+
@Override
43+
public int numRequeues(T item) {
44+
return 0;
45+
}
46+
47+
private class DelayGetter extends ScheduledThreadPoolExecutor {
48+
private Duration delay = Duration.ZERO;
49+
50+
@Override
51+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
52+
this.delay = Duration.ofNanos(unit.toNanos(delay));
53+
return null;
54+
}
55+
56+
private DelayGetter() {
57+
super(0);
58+
}
59+
60+
private Duration getDelay() {
61+
return delay;
62+
}
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
6+
/**
7+
* DefaultControllerRateLimiter is a default rate limiter for workqueue. It has both overall and
8+
* per-item rate limiting. The overall is a token bucket and the per-item is exponential
9+
*/
10+
public class DefaultControllerRateLimiter<T> implements RateLimiter<T> {
11+
12+
private RateLimiter<T> internalRateLimiter;
13+
14+
public DefaultControllerRateLimiter() {
15+
this.internalRateLimiter =
16+
new MaxOfRateLimiter<>(
17+
Arrays.asList(
18+
new ItemExponentialFailureRateLimiter<>(
19+
Duration.ofMillis(5), Duration.ofSeconds(1000)),
20+
new BucketRateLimiter<>(100, 10, Duration.ofMinutes(1))));
21+
}
22+
23+
@Override
24+
public Duration when(T item) {
25+
return internalRateLimiter.when(item);
26+
}
27+
28+
@Override
29+
public void forget(T item) {
30+
internalRateLimiter.forget(item);
31+
}
32+
33+
@Override
34+
public int numRequeues(T item) {
35+
return internalRateLimiter.numRequeues(item);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import com.google.common.util.concurrent.AtomicLongMap;
4+
import java.time.Duration;
5+
6+
/**
7+
* ItemExponentialFailureRateLimiter does a simple baseDelay*10<sup>num-failures</sup> limit dealing
8+
* with max failures and expiration are up to the caller
9+
*/
10+
public class ItemExponentialFailureRateLimiter<T> implements RateLimiter<T> {
11+
12+
private Duration baseDelay;
13+
private Duration maxDelay;
14+
15+
private AtomicLongMap<T> failures;
16+
17+
public ItemExponentialFailureRateLimiter(Duration baseDelay, Duration maxDelay) {
18+
this.baseDelay = baseDelay;
19+
this.maxDelay = maxDelay;
20+
21+
failures = AtomicLongMap.create();
22+
}
23+
24+
@Override
25+
public Duration when(T item) {
26+
long exp = failures.getAndIncrement(item);
27+
long d = maxDelay.toMillis() >> exp;
28+
return d > baseDelay.toMillis() ? baseDelay.multipliedBy(1 << exp) : maxDelay;
29+
}
30+
31+
@Override
32+
public void forget(T item) {
33+
failures.remove(item);
34+
}
35+
36+
@Override
37+
public int numRequeues(T item) {
38+
return (int) failures.get(item);
39+
}
40+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import com.google.common.util.concurrent.AtomicLongMap;
4+
import java.time.Duration;
5+
6+
/**
7+
* ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry
8+
* after that
9+
*/
10+
public class ItemFastSlowRateLimiter<T> implements RateLimiter<T> {
11+
12+
private Duration fastDelay;
13+
private Duration slowDelay;
14+
private int maxFastAttempts;
15+
16+
private AtomicLongMap<T> failures;
17+
18+
public ItemFastSlowRateLimiter(Duration fastDelay, Duration slowDelay, int maxFastAttempts) {
19+
this.fastDelay = fastDelay;
20+
this.slowDelay = slowDelay;
21+
this.maxFastAttempts = maxFastAttempts;
22+
23+
failures = AtomicLongMap.create();
24+
}
25+
26+
@Override
27+
public Duration when(T item) {
28+
long attempts = failures.incrementAndGet(item);
29+
if (attempts <= maxFastAttempts) {
30+
return fastDelay;
31+
}
32+
return slowDelay;
33+
}
34+
35+
@Override
36+
public void forget(T item) {
37+
failures.remove(item);
38+
}
39+
40+
@Override
41+
public int numRequeues(T item) {
42+
return (int) failures.get(item);
43+
}
44+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.kubernetes.client.extended.workqueue.ratelimiter;
2+
3+
import java.time.Duration;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
7+
/**
8+
* MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a
9+
* token bucket limiter, the burst could be apparently exceeded in cases where particular items were
10+
* separately delayed a longer time.
11+
*/
12+
public class MaxOfRateLimiter<T> implements RateLimiter<T> {
13+
private List<RateLimiter<T>> rateLimiters;
14+
15+
public MaxOfRateLimiter(List<RateLimiter<T>> rateLimiters) {
16+
this.rateLimiters = rateLimiters;
17+
}
18+
19+
@SafeVarargs
20+
@SuppressWarnings("varargs")
21+
public MaxOfRateLimiter(RateLimiter<T>... rateLimiters) {
22+
this(Arrays.asList(rateLimiters));
23+
}
24+
25+
@Override
26+
public Duration when(T item) {
27+
Duration max = Duration.ZERO;
28+
for (RateLimiter<T> r : rateLimiters) {
29+
Duration current = r.when(item);
30+
if (current.compareTo(max) > 0) {
31+
max = current;
32+
}
33+
}
34+
35+
return max;
36+
}
37+
38+
@Override
39+
public void forget(T item) {
40+
rateLimiters.forEach(r -> r.forget(item));
41+
}
42+
43+
@Override
44+
public int numRequeues(T item) {
45+
int max = 0;
46+
for (RateLimiter<T> r : rateLimiters) {
47+
int current = r.numRequeues(item);
48+
if (current > max) {
49+
max = current;
50+
}
51+
}
52+
53+
return max;
54+
}
55+
}

0 commit comments

Comments
 (0)