Skip to content

Commit 58dab7f

Browse files
authored
Merge pull request #204 from jasonk000/jkoch/less-locking-partitioned-limiter
Reduce contention a little in AbstractPartitionedLimiter
2 parents 075bc76 + 09a1e6f commit 58dab7f

File tree

4 files changed

+240
-57
lines changed

4 files changed

+240
-57
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public BuilderT metricRegistry(MetricRegistry registry) {
7373
* Due to the builders not having access to the ContextT, it is the duty of subclasses to ensure that
7474
* implementations are type safe.
7575
*
76+
* Predicates should not rely strictly on state of the Limiter (such as inflight count) when evaluating
77+
* whether to bypass. There is no guarantee that the state will be synchronized or consistent with respect to
78+
* the bypass predicate, and the bypass predicate may be called by multiple threads concurrently.
79+
*
7680
* @param shouldBypass Predicate condition to bypass limit
7781
* @return Chainable builder
7882
*/

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java

Lines changed: 75 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Optional;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicInteger;
33-
import java.util.concurrent.locks.ReentrantLock;
3433
import java.util.function.Function;
3534

3635
public abstract class AbstractPartitionedLimiter<ContextT> extends AbstractLimiter<ContextT> {
@@ -105,10 +104,10 @@ public Limiter<ContextT> build() {
105104

106105
static class Partition {
107106
private final String name;
107+
private final AtomicInteger busy = new AtomicInteger(0);
108108

109109
private double percent = 0.0;
110-
private int limit = 0;
111-
private int busy = 0;
110+
private volatile int limit = 0;
112111
private long backoffMillis = 0;
113112
private MetricRegistry.SampleListener inflightDistribution;
114113

@@ -134,25 +133,41 @@ void updateLimit(int totalLimit) {
134133
}
135134

136135
boolean isLimitExceeded() {
137-
return busy >= limit;
136+
return busy.get() >= limit;
138137
}
139138

140139
void acquire() {
141-
busy++;
142-
inflightDistribution.addSample(busy);
140+
int nowBusy = busy.incrementAndGet();
141+
inflightDistribution.addSample(nowBusy);
142+
}
143+
144+
/**
145+
* Try to acquire a slot, returning false if the limit is exceeded.
146+
* @return
147+
*/
148+
boolean tryAcquire() {
149+
int current = busy.get();
150+
while (current < limit) {
151+
if (busy.compareAndSet(current, current + 1)) {
152+
inflightDistribution.addSample(current + 1);
153+
return true;
154+
}
155+
current = busy.get();
156+
}
143157

158+
return false;
144159
}
145160

146161
void release() {
147-
busy--;
162+
busy.decrementAndGet();
148163
}
149164

150165
int getLimit() {
151166
return limit;
152167
}
153168

154169
public int getInflight() {
155-
return busy;
170+
return busy.get();
156171
}
157172

158173
double getPercent() {
@@ -166,14 +181,13 @@ void createMetrics(MetricRegistry registry) {
166181

167182
@Override
168183
public String toString() {
169-
return "Partition [pct=" + percent + ", limit=" + limit + ", busy=" + busy + "]";
184+
return "Partition [pct=" + percent + ", limit=" + limit + ", busy=" + busy.get() + "]";
170185
}
171186
}
172187

173188
private final Map<String, Partition> partitions;
174189
private final Partition unknownPartition;
175190
private final List<Function<ContextT, String>> partitionResolvers;
176-
private final ReentrantLock lock = new ReentrantLock();
177191
private final AtomicInteger delayedThreads = new AtomicInteger();
178192
private final int maxDelayedThreads;
179193

@@ -211,63 +225,67 @@ private Partition resolvePartition(ContextT context) {
211225

212226
@Override
213227
public Optional<Listener> acquire(ContextT context) {
214-
final Partition partition = resolvePartition(context);
215-
216-
try {
217-
lock.lock();
218-
if (shouldBypass(context)){
219-
return createBypassListener();
220-
}
221-
if (getInflight() >= getLimit() && partition.isLimitExceeded()) {
222-
lock.unlock();
223-
if (partition.backoffMillis > 0 && delayedThreads.get() < maxDelayedThreads) {
224-
try {
225-
delayedThreads.incrementAndGet();
226-
TimeUnit.MILLISECONDS.sleep(partition.backoffMillis);
227-
} catch (InterruptedException e) {
228-
Thread.currentThread().interrupt();
229-
} finally {
230-
delayedThreads.decrementAndGet();
231-
}
232-
}
228+
if (shouldBypass(context)){
229+
return createBypassListener();
230+
}
233231

234-
return createRejectedListener();
235-
}
232+
final Partition partition = resolvePartition(context);
236233

234+
// This is a little unusual in that the partition is not a hard limit. It is
235+
// only a limit that it is applied if the global limit is exceeded. This allows
236+
// for excess capacity in each partition to allow for bursting over the limit,
237+
// but only if there is spare global capacity.
238+
239+
final boolean overLimit;
240+
if (getInflight() >= getLimit()) {
241+
// over global limit, so respect partition limit
242+
boolean couldAcquire = partition.tryAcquire();
243+
overLimit = !couldAcquire;
244+
} else {
245+
// we are below global limit, so no need to respect partition limit
237246
partition.acquire();
238-
final Listener listener = createListener();
239-
return Optional.of(new Listener() {
240-
@Override
241-
public void onSuccess() {
242-
listener.onSuccess();
243-
releasePartition(partition);
244-
}
247+
overLimit = false;
248+
}
245249

246-
@Override
247-
public void onIgnore() {
248-
listener.onIgnore();
249-
releasePartition(partition);
250+
if (overLimit) {
251+
if (partition.backoffMillis > 0 && delayedThreads.get() < maxDelayedThreads) {
252+
try {
253+
delayedThreads.incrementAndGet();
254+
TimeUnit.MILLISECONDS.sleep(partition.backoffMillis);
255+
} catch (InterruptedException e) {
256+
Thread.currentThread().interrupt();
257+
} finally {
258+
delayedThreads.decrementAndGet();
250259
}
260+
}
251261

252-
@Override
253-
public void onDropped() {
254-
listener.onDropped();
255-
releasePartition(partition);
256-
}
257-
});
258-
} finally {
259-
if (lock.isHeldByCurrentThread())
260-
lock.unlock();
262+
return createRejectedListener();
261263
}
264+
265+
final Listener listener = createListener();
266+
return Optional.of(new Listener() {
267+
@Override
268+
public void onSuccess() {
269+
listener.onSuccess();
270+
releasePartition(partition);
271+
}
272+
273+
@Override
274+
public void onIgnore() {
275+
listener.onIgnore();
276+
releasePartition(partition);
277+
}
278+
279+
@Override
280+
public void onDropped() {
281+
listener.onDropped();
282+
releasePartition(partition);
283+
}
284+
});
262285
}
263286

264287
private void releasePartition(Partition partition) {
265-
try {
266-
lock.lock();
267-
partition.release();
268-
} finally {
269-
lock.unlock();
270-
}
288+
partition.release();
271289
}
272290

273291
@Override

concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
package com.netflix.concurrency.limits.limiter;
22

33
import com.netflix.concurrency.limits.Limiter;
4+
import com.netflix.concurrency.limits.Limiter.Listener;
45
import com.netflix.concurrency.limits.limit.FixedLimit;
56
import com.netflix.concurrency.limits.limit.SettableLimit;
67
import org.junit.Assert;
78
import org.junit.Test;
89

10+
import java.util.Arrays;
11+
import java.util.Map;
912
import java.util.Optional;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicInteger;
1019
import java.util.function.Function;
1120
import java.util.function.Predicate;
1221

@@ -227,4 +236,86 @@ public void testBypassSimpleLimiter() {
227236
Assert.assertTrue(limiter.acquire("admin").isPresent());
228237
}
229238
}
239+
240+
@Test
241+
public void testConcurrentPartitions() throws InterruptedException {
242+
final int THREAD_COUNT = 5;
243+
final int ITERATIONS = 500;
244+
final int LIMIT = 20;
245+
246+
AbstractPartitionedLimiter<String> limiter = (AbstractPartitionedLimiter<String>) TestPartitionedLimiter.newBuilder()
247+
.limit(FixedLimit.of(LIMIT))
248+
.partitionResolver(Function.identity())
249+
.partition("A", 0.5)
250+
.partition("B", 0.3)
251+
.partition("C", 0.2)
252+
.build();
253+
254+
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT * 3);
255+
CountDownLatch startLatch = new CountDownLatch(1);
256+
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT * 3);
257+
Map<String, AtomicInteger> successCounts = new ConcurrentHashMap<>();
258+
Map<String, AtomicInteger> rejectionCounts = new ConcurrentHashMap<>();
259+
Map<String, AtomicInteger> maxConcurrents = new ConcurrentHashMap<>();
260+
AtomicInteger globalMaxInflight = new AtomicInteger(0);
261+
262+
for (String partition : Arrays.asList("A", "B", "C")) {
263+
successCounts.put(partition, new AtomicInteger(0));
264+
rejectionCounts.put(partition, new AtomicInteger(0));
265+
maxConcurrents.put(partition, new AtomicInteger(0));
266+
267+
for (int i = 0; i < THREAD_COUNT; i++) {
268+
executor.submit(() -> {
269+
try {
270+
startLatch.await();
271+
for (int j = 0; j < ITERATIONS; j++) {
272+
Optional<Listener> listener = limiter.acquire(partition);
273+
if (listener.isPresent()) {
274+
try {
275+
int current = limiter.getPartition(partition).getInflight();
276+
maxConcurrents.get(partition).updateAndGet(max -> Math.max(max, current));
277+
successCounts.get(partition).incrementAndGet();
278+
globalMaxInflight.updateAndGet(max -> Math.max(max, limiter.getInflight()));
279+
Thread.sleep(1); // Simulate some work
280+
} finally {
281+
listener.get().onSuccess();
282+
}
283+
} else {
284+
rejectionCounts.get(partition).incrementAndGet();
285+
}
286+
}
287+
} catch (InterruptedException e) {
288+
Thread.currentThread().interrupt();
289+
} finally {
290+
endLatch.countDown();
291+
}
292+
});
293+
}
294+
}
295+
296+
startLatch.countDown();
297+
endLatch.await();
298+
executor.shutdown();
299+
executor.awaitTermination(10, TimeUnit.SECONDS);
300+
301+
StringBuilder resultSummary = new StringBuilder();
302+
for (String partition : Arrays.asList("A", "B", "C")) {
303+
int successCount = successCounts.get(partition).get();
304+
int rejectionCount = rejectionCounts.get(partition).get();
305+
int maxConcurrent = maxConcurrents.get(partition).get();
306+
307+
resultSummary.append(String.format("%s(success=%d,reject=%d,maxConcurrent=%d) ",
308+
partition, successCount, rejectionCount, maxConcurrent));
309+
310+
Assert.assertTrue("Max concurrent for " + partition + " should not exceed global limit. " + resultSummary,
311+
maxConcurrent <= LIMIT);
312+
Assert.assertEquals("Total attempts for " + partition + " should equal success + rejections. " + resultSummary,
313+
THREAD_COUNT * ITERATIONS,
314+
successCount + rejectionCount);
315+
}
316+
317+
Assert.assertTrue("Global max inflight should not exceed total limit. " + resultSummary,
318+
globalMaxInflight.get() <= LIMIT + THREAD_COUNT);
319+
}
320+
230321
}

0 commit comments

Comments
 (0)