Skip to content

Commit 38541af

Browse files
perf(misc): optimize FairLimiter implementation (#2670) (#2672)
* fix(s3stream): avoid StreamMetadataManager add callback when retry * perf(misc): optimize FairLimiter implementation Co-authored-by: lifepuzzlefun <[email protected]>
1 parent 0cd047c commit 38541af

File tree

1 file changed

+5
-53
lines changed

1 file changed

+5
-53
lines changed

core/src/main/scala/kafka/server/FairLimiter.java

Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,82 +21,34 @@
2121

2222
import java.util.concurrent.Semaphore;
2323
import java.util.concurrent.TimeUnit;
24-
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.locks.Lock;
26-
import java.util.concurrent.locks.ReentrantLock;
2724

2825
/**
2926
* A fair limiter whose {@link #acquire} method is fair, i.e. the waiting threads are served in the order of arrival.
3027
*/
3128
public class FairLimiter implements Limiter {
3229
private final int maxPermits;
33-
/**
34-
* The lock used to protect @{link #acquireLocked}
35-
*/
36-
private final Lock lock = new ReentrantLock(true);
3730
private final Semaphore permits;
3831

3932
/**
4033
* The name of this limiter, used for metrics.
4134
*/
4235
private final String name;
43-
/**
44-
* The number of threads waiting for permits, used for metrics.
45-
*/
46-
private final AtomicInteger waitingThreads = new AtomicInteger(0);
4736

4837
public FairLimiter(int size, String name) {
4938
this.maxPermits = size;
50-
this.permits = new Semaphore(size);
39+
this.permits = new Semaphore(size, true);
5140
this.name = name;
5241
}
5342

5443
@Override
5544
public Handler acquire(int permit) throws InterruptedException {
56-
waitingThreads.incrementAndGet();
57-
try {
58-
return acquire0(permit);
59-
} finally {
60-
waitingThreads.decrementAndGet();
61-
}
62-
}
63-
64-
private Handler acquire0(int permit) throws InterruptedException {
65-
lock.lock();
66-
try {
67-
permits.acquire(permit);
68-
return new FairHandler(permit);
69-
} finally {
70-
lock.unlock();
71-
}
45+
permits.acquire(permit);
46+
return new FairHandler(permit);
7247
}
7348

7449
@Override
7550
public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
76-
waitingThreads.incrementAndGet();
77-
try {
78-
return acquire0(permit, timeoutMs);
79-
} finally {
80-
waitingThreads.decrementAndGet();
81-
}
82-
}
83-
84-
private Handler acquire0(int permit, long timeoutMs) throws InterruptedException {
85-
long start = System.nanoTime();
86-
if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
87-
try {
88-
// calculate the time left for {@code acquireLocked}
89-
long elapsed = System.nanoTime() - start;
90-
long left = TimeUnit.MILLISECONDS.toNanos(timeoutMs) - elapsed;
91-
// note: {@code left} may be negative here, but it's OK for acquireLocked
92-
return acquireLocked(permit, left);
93-
} finally {
94-
lock.unlock();
95-
}
96-
} else {
97-
// tryLock timeout, return null
98-
return null;
99-
}
51+
return acquireLocked(permit, timeoutMs);
10052
}
10153

10254
@Override
@@ -111,7 +63,7 @@ public int availablePermits() {
11163

11264
@Override
11365
public int waitingThreads() {
114-
return waitingThreads.get();
66+
return permits.getQueueLength();
11567
}
11668

11769
@Override

0 commit comments

Comments
 (0)