Skip to content

Commit 9808a48

Browse files
authored
Merge pull request #194 from Netflix/feature/bypass-filter
Allow calls to be bypassed without affecting limiter algorithm
2 parents 291d7bb + 628dde9 commit 9808a48

File tree

25 files changed

+960
-185
lines changed

25 files changed

+960
-185
lines changed

concurrency-limits-core/dependencies.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
},
2323
"testRuntimeClasspath": {
2424
"org.junit.jupiter:junit-jupiter-engine": {
25-
"locked": "5.9.0"
25+
"locked": "5.10.2"
2626
},
2727
"org.junit.vintage:junit-vintage-engine": {
28-
"locked": "5.9.0"
28+
"locked": "5.10.2"
2929
},
3030
"org.slf4j:slf4j-api": {
3131
"locked": "1.7.36"

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,41 @@
2424

2525
import java.util.Optional;
2626
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.function.Predicate;
2728
import java.util.function.Supplier;
2829

2930
public abstract class AbstractLimiter<ContextT> implements Limiter<ContextT> {
3031
public static final String ID_TAG = "id";
3132
public static final String STATUS_TAG = "status";
3233

34+
/**
35+
* Constructs a new builder with a list of bypass resolvers.
36+
* If the predicate condition in any of the resolver is satisfied,
37+
* the call is bypassed without increasing the limiter inflight count
38+
* and affecting the algorithm.
39+
*/
40+
public abstract static class BypassLimiterBuilder<BuilderT extends BypassLimiterBuilder<BuilderT, ContextT>, ContextT> extends Builder<BuilderT> {
41+
42+
private final Predicate<ContextT> ALWAYS_FALSE = (context) -> false;
43+
private Predicate<ContextT> bypassResolver = ALWAYS_FALSE;
44+
45+
/**
46+
* Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the
47+
* predicate condition returns true the call is bypassed without increasing the limiter inflight count and
48+
* affecting the algorithm. Will not bypass any calls by default if no resolvers are added.
49+
* @param shouldBypass Predicate condition to bypass limit
50+
* @return Chainable builder
51+
*/
52+
public BuilderT bypassLimitResolver(Predicate<ContextT> shouldBypass) {
53+
if (this.bypassResolver == ALWAYS_FALSE) {
54+
this.bypassResolver = shouldBypass;
55+
} else {
56+
this.bypassResolver = bypassResolver.or(shouldBypass);
57+
}
58+
return self();
59+
}
60+
}
61+
3362
public abstract static class Builder<BuilderT extends Builder<BuilderT>> {
3463
private static final AtomicInteger idCounter = new AtomicInteger();
3564

@@ -69,6 +98,8 @@ public BuilderT metricRegistry(MetricRegistry registry) {
6998
private final MetricRegistry.Counter droppedCounter;
7099
private final MetricRegistry.Counter ignoredCounter;
71100
private final MetricRegistry.Counter rejectedCounter;
101+
private final MetricRegistry.Counter bypassCounter;
102+
private Predicate<ContextT> bypassResolver = (context) -> false;
72103

73104
private volatile int limit;
74105

@@ -77,19 +108,47 @@ protected AbstractLimiter(Builder<?> builder) {
77108
this.limitAlgorithm = builder.limit;
78109
this.limit = limitAlgorithm.getLimit();
79110
this.limitAlgorithm.notifyOnChange(this::onNewLimit);
80-
111+
if (builder instanceof BypassLimiterBuilder) {
112+
this.bypassResolver = ((BypassLimiterBuilder) builder).bypassResolver;
113+
}
81114
builder.registry.gauge(MetricIds.LIMIT_NAME, this::getLimit);
82115
this.successCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "success");
83116
this.droppedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "dropped");
84117
this.ignoredCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "ignored");
85118
this.rejectedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "rejected");
119+
this.bypassCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "bypassed");
120+
}
121+
122+
protected boolean shouldBypass(ContextT context){
123+
return bypassResolver.test(context);
86124
}
87125

88126
protected Optional<Listener> createRejectedListener() {
89127
this.rejectedCounter.increment();
90128
return Optional.empty();
91129
}
92130

131+
protected Optional<Listener> createBypassListener() {
132+
this.bypassCounter.increment();
133+
return Optional.of(new Listener() {
134+
135+
@Override
136+
public void onSuccess() {
137+
// Do nothing
138+
}
139+
140+
@Override
141+
public void onIgnore() {
142+
// Do nothing
143+
}
144+
145+
@Override
146+
public void onDropped() {
147+
// Do nothing
148+
}
149+
});
150+
}
151+
93152
protected Listener createListener() {
94153
final long startTime = clock.get();
95154
final int currentInflight = inFlight.incrementAndGet();

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public abstract class AbstractPartitionedLimiter<ContextT> extends AbstractLimit
3737
private static final Logger LOG = LoggerFactory.getLogger(AbstractPartitionedLimiter.class);
3838
private static final String PARTITION_TAG_NAME = "partition";
3939

40-
public abstract static class Builder<BuilderT extends AbstractLimiter.Builder<BuilderT>, ContextT> extends AbstractLimiter.Builder<BuilderT> {
40+
public abstract static class Builder<BuilderT extends AbstractLimiter.BypassLimiterBuilder<BuilderT, ContextT>, ContextT> extends AbstractLimiter.BypassLimiterBuilder<BuilderT, ContextT> {
4141
private List<Function<ContextT, String>> partitionResolvers = new ArrayList<>();
4242
private final Map<String, Partition> partitions = new LinkedHashMap<>();
4343
private int maxDelayedThreads = 100;
@@ -215,6 +215,9 @@ public Optional<Listener> acquire(ContextT context) {
215215

216216
try {
217217
lock.lock();
218+
if (shouldBypass(context)){
219+
return createBypassListener();
220+
}
218221
if (getInflight() >= getLimit() && partition.isLimitExceeded()) {
219222
lock.unlock();
220223
if (partition.backoffMillis > 0 && delayedThreads.get() < maxDelayedThreads) {

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@
2323
import java.util.concurrent.Semaphore;
2424

2525
public class SimpleLimiter<ContextT> extends AbstractLimiter<ContextT> {
26+
27+
public static class BypassLimiterBuilder<ContextT> extends AbstractLimiter.BypassLimiterBuilder<BypassLimiterBuilder<ContextT>, ContextT> {
28+
public SimpleLimiter<ContextT> build() {
29+
return new SimpleLimiter<>(this);
30+
}
31+
32+
@Override
33+
protected BypassLimiterBuilder<ContextT> self() {
34+
return this;
35+
}
36+
}
37+
2638
public static class Builder extends AbstractLimiter.Builder<Builder> {
2739
public <ContextT> SimpleLimiter<ContextT> build() {
2840
return new SimpleLimiter<>(this);
@@ -34,6 +46,10 @@ protected Builder self() {
3446
}
3547
}
3648

49+
public static <ContextT> BypassLimiterBuilder<ContextT> newBypassLimiterBuilder() {
50+
return new BypassLimiterBuilder<>();
51+
}
52+
3753
public static Builder newBuilder() {
3854
return new Builder();
3955
}
@@ -42,21 +58,22 @@ public static Builder newBuilder() {
4258

4359
public SimpleLimiter(AbstractLimiter.Builder<?> builder) {
4460
super(builder);
45-
4661
this.inflightDistribution = builder.registry.distribution(MetricIds.INFLIGHT_NAME);
4762
this.semaphore = new AdjustableSemaphore(getLimit());
4863
}
4964

5065
@Override
5166
public Optional<Limiter.Listener> acquire(ContextT context) {
5267
Optional<Limiter.Listener> listener;
53-
if (!semaphore.tryAcquire()) {
68+
if (shouldBypass(context)) {
69+
listener = createBypassListener();
70+
}
71+
else if (!semaphore.tryAcquire()) {
5472
listener = createRejectedListener();
5573
}
5674
else {
5775
listener = Optional.of(new Listener(createListener()));
5876
}
59-
6077
inflightDistribution.addSample(getInflight());
6178
return listener;
6279
}

concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import java.util.Optional;
1010
import java.util.function.Function;
11+
import java.util.function.Predicate;
1112

1213
public class AbstractPartitionedLimiterTest {
1314
public static class TestPartitionedLimiter extends AbstractPartitionedLimiter<String> {
@@ -27,6 +28,13 @@ public TestPartitionedLimiter(Builder builder) {
2728
}
2829
}
2930

31+
public static class ShouldBypassPredicate implements Predicate<String> {
32+
@Override
33+
public boolean test(String s) {
34+
return s.contains("admin");
35+
}
36+
}
37+
3038
@Test
3139
public void limitAllocatedToBins() {
3240
AbstractPartitionedLimiter<String> limiter = (AbstractPartitionedLimiter<String>) TestPartitionedLimiter.newBuilder()
@@ -156,4 +164,67 @@ public void setLimitReservesBusy() {
156164
Assert.assertEquals(1, limiter.getPartition("batch").getInflight());
157165
Assert.assertEquals(1, limiter.getInflight());
158166
}
167+
168+
@Test
169+
public void testBypassPartitionedLimiter() {
170+
171+
AbstractPartitionedLimiter<String> limiter = (AbstractPartitionedLimiter<String>) TestPartitionedLimiter.newBuilder()
172+
.partitionResolver(Function.identity())
173+
.partition("batch", 0.1)
174+
.partition("live", 0.9)
175+
.limit(FixedLimit.of(10))
176+
.bypassLimitResolver(new ShouldBypassPredicate())
177+
.build();
178+
179+
Assert.assertTrue(limiter.acquire("batch").isPresent());
180+
Assert.assertEquals(1, limiter.getPartition("batch").getInflight());
181+
Assert.assertTrue(limiter.acquire("admin").isPresent());
182+
183+
for (int i = 0; i < 9; i++) {
184+
Assert.assertTrue(limiter.acquire("live").isPresent());
185+
Assert.assertEquals(i+1, limiter.getPartition("live").getInflight());
186+
Assert.assertTrue(limiter.acquire("admin").isPresent());
187+
}
188+
189+
// Verify that bypassed requests are able to proceed even when the limiter is full
190+
Assert.assertFalse(limiter.acquire("batch").isPresent());
191+
Assert.assertEquals(1, limiter.getPartition("batch").getInflight());
192+
Assert.assertFalse(limiter.acquire("live").isPresent());
193+
Assert.assertEquals(9, limiter.getPartition("live").getInflight());
194+
Assert.assertEquals(10, limiter.getInflight());
195+
Assert.assertTrue(limiter.acquire("admin").isPresent());
196+
}
197+
198+
@Test
199+
public void testBypassSimpleLimiter() {
200+
201+
SimpleLimiter<String> limiter = (SimpleLimiter<String>) TestPartitionedLimiter.newBuilder()
202+
.limit(FixedLimit.of(10))
203+
.bypassLimitResolver(new ShouldBypassPredicate())
204+
.build();
205+
206+
int inflightCount = 0;
207+
for (int i = 0; i < 5; i++) {
208+
Assert.assertTrue(limiter.acquire("request").isPresent());
209+
Assert.assertEquals(i+1, limiter.getInflight());
210+
inflightCount++;
211+
}
212+
213+
for (int i = 0; i < 15; i++) {
214+
Assert.assertTrue(limiter.acquire("admin").isPresent());
215+
Assert.assertEquals(inflightCount, limiter.getInflight());
216+
}
217+
218+
for (int i = 0; i < 5; i++) {
219+
Assert.assertTrue(limiter.acquire("request").isPresent());
220+
Assert.assertEquals(inflightCount+i+1, limiter.getInflight());
221+
}
222+
223+
// Calls with passing bypass condition will return a token
224+
// whereas remaining calls will be throttled since inflight count is greater than the limit
225+
for (int i = 0; i < 10; i++) {
226+
Assert.assertFalse(limiter.acquire("request").isPresent());
227+
Assert.assertTrue(limiter.acquire("admin").isPresent());
228+
}
229+
}
159230
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.netflix.concurrency.limits.limiter;
2+
3+
import com.netflix.concurrency.limits.Limiter;
4+
import com.netflix.concurrency.limits.limit.FixedLimit;
5+
import org.junit.Assert;
6+
import org.junit.Test;
7+
8+
import java.util.Optional;
9+
10+
public class SimpleLimiterTest {
11+
12+
@Test
13+
public void useLimiterCapacityUntilTotalLimit() {
14+
SimpleLimiter<String> limiter = SimpleLimiter.newBuilder()
15+
.limit(FixedLimit.of(10))
16+
.build();
17+
18+
for (int i = 0; i < 10; i++) {
19+
Assert.assertTrue(limiter.acquire("live").isPresent());
20+
}
21+
22+
// Rejected call after total limit is utilized
23+
Assert.assertFalse(limiter.acquire("live").isPresent());
24+
Assert.assertEquals(10, limiter.getInflight());
25+
}
26+
27+
@Test
28+
public void testReleaseLimit() {
29+
SimpleLimiter<String> limiter = SimpleLimiter.newBuilder()
30+
.limit(FixedLimit.of(10))
31+
.build();
32+
33+
Optional<Limiter.Listener> completion = limiter.acquire("live");
34+
for (int i = 1; i < 10; i++) {
35+
Assert.assertTrue(limiter.acquire("live").isPresent());
36+
}
37+
38+
Assert.assertEquals(10, limiter.getInflight());
39+
Assert.assertFalse(limiter.acquire("live").isPresent());
40+
41+
// Release token
42+
completion.get().onSuccess();
43+
Assert.assertEquals(9, limiter.getInflight());
44+
45+
Assert.assertTrue(limiter.acquire("live").isPresent());
46+
Assert.assertEquals(10, limiter.getInflight());
47+
}
48+
49+
@Test
50+
public void testSimpleBypassLimiter() {
51+
SimpleLimiter<String> limiter = SimpleLimiter.<String>newBypassLimiterBuilder()
52+
.limit(FixedLimit.of(10))
53+
.bypassLimitResolver((context) -> context.equals("admin"))
54+
.build();
55+
56+
for (int i = 0; i < 10; i++) {
57+
Assert.assertTrue(limiter.acquire("live").isPresent());
58+
Assert.assertEquals(i+1, limiter.getInflight());
59+
}
60+
61+
// Verify calls with passing bypass condition will return a token
62+
// whereas remaining calls will be throttled since inflight count is greater than the limit
63+
for (int i = 0; i < 10; i++) {
64+
Assert.assertFalse(limiter.acquire("live").isPresent());
65+
Assert.assertTrue(limiter.acquire("admin").isPresent());
66+
}
67+
}
68+
69+
@Test
70+
public void testSimpleBypassLimiterDefault() {
71+
SimpleLimiter<String> limiter = SimpleLimiter.<String>newBypassLimiterBuilder()
72+
.limit(FixedLimit.of(10))
73+
.build();
74+
75+
for (int i = 0; i < 10; i++) {
76+
Assert.assertTrue(limiter.acquire("live").isPresent());
77+
Assert.assertEquals(i+1, limiter.getInflight());
78+
}
79+
80+
// Verify that no calls are bypassed by default
81+
Assert.assertFalse(limiter.acquire("live").isPresent());
82+
Assert.assertFalse(limiter.acquire("admin").isPresent());
83+
}
84+
85+
}

0 commit comments

Comments
 (0)