Skip to content

Commit fef3c46

Browse files
committed
OF-3171: Fix token bucket refill precision and clarify refill logic
Preserve fractional refill time in TokenBucketRateLimiter by carrying sub-token remainder across refill cycles, instead of discarding it when whole tokens are added.
1 parent e9e575c commit fef3c46

File tree

2 files changed

+143
-18
lines changed

2 files changed

+143
-18
lines changed

xmppserver/src/main/java/org/jivesoftware/util/TokenBucketRateLimiter.java

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
*/
1616
package org.jivesoftware.util;
1717

18+
import com.google.common.annotations.VisibleForTesting;
19+
1820
import java.time.Duration;
21+
import java.util.Objects;
22+
import java.util.function.LongSupplier;
1923

2024
/**
2125
* A thread-safe, synchronized token-bucket rate limiter with metrics.
@@ -29,9 +33,12 @@ public final class TokenBucketRateLimiter
2933
{
3034
private final long capacity;
3135
private final long refillTokensPerSecond;
36+
private final LongSupplier nanoTimeSupplier;
3237

3338
private long availableTokens;
3439
private long lastRefillTimeNanos;
40+
// Leftover refill value in scaled units (1 token = 1_000_000_000 units).
41+
private long refillRemainder;
3542

3643
private long acceptedEvents;
3744
private long rejectedEvents;
@@ -46,14 +53,16 @@ public final class TokenBucketRateLimiter
4653
/**
4754
* Creates an unlimited rate limiter. Use {@link #unlimited()} to obtain an instance.
4855
*/
49-
private TokenBucketRateLimiter()
56+
private TokenBucketRateLimiter(final LongSupplier nanoTimeSupplier)
5057
{
58+
this.nanoTimeSupplier = Objects.requireNonNull(nanoTimeSupplier, "nanoTimeSupplier must not be null");
5159
this.unlimited = true;
5260
this.refillTokensPerSecond = 0;
5361
this.capacity = 0;
5462
this.availableTokens = 0;
5563
this.lastRefillTimeNanos = 0;
56-
this.startTimeNanos = System.nanoTime();
64+
this.refillRemainder = 0;
65+
this.startTimeNanos = nanoTime();
5766
}
5867

5968
/**
@@ -63,6 +72,21 @@ private TokenBucketRateLimiter()
6372
* @param maxBurst maximum number of permits that can accumulate
6473
*/
6574
public TokenBucketRateLimiter(final long permitsPerSecond, final long maxBurst)
75+
{
76+
this(permitsPerSecond, maxBurst, System::nanoTime);
77+
}
78+
79+
/**
80+
* Creates a new rate limiter with a custom clock.
81+
*
82+
* Normally, the system clock is used. This constructor mainly exists for tests.
83+
*
84+
* @param permitsPerSecond sustained rate of permits
85+
* @param maxBurst maximum number of permits that can accumulate
86+
* @param nanoTimeSupplier custom clock
87+
*/
88+
@VisibleForTesting
89+
TokenBucketRateLimiter(final long permitsPerSecond, final long maxBurst, final LongSupplier nanoTimeSupplier)
6690
{
6791
if (permitsPerSecond <= 0) {
6892
throw new IllegalArgumentException("permitsPerSecond must be > 0");
@@ -71,11 +95,13 @@ public TokenBucketRateLimiter(final long permitsPerSecond, final long maxBurst)
7195
throw new IllegalArgumentException("maxBurst must be > 0");
7296
}
7397

98+
this.nanoTimeSupplier = Objects.requireNonNull(nanoTimeSupplier, "nanoTimeSupplier must not be null");
7499
this.unlimited = false;
75100
this.refillTokensPerSecond = permitsPerSecond;
76101
this.capacity = maxBurst;
77102
this.availableTokens = maxBurst;
78-
this.lastRefillTimeNanos = System.nanoTime();
103+
this.lastRefillTimeNanos = nanoTime();
104+
this.refillRemainder = 0;
79105
this.startTimeNanos = this.lastRefillTimeNanos;
80106
}
81107

@@ -86,7 +112,19 @@ public TokenBucketRateLimiter(final long permitsPerSecond, final long maxBurst)
86112
*/
87113
public static TokenBucketRateLimiter unlimited()
88114
{
89-
return new TokenBucketRateLimiter();
115+
return new TokenBucketRateLimiter(System::nanoTime);
116+
}
117+
118+
/**
119+
* Returns the current time in nanoseconds from this instance clock.
120+
*
121+
* Usually this is the system clock, but tests can provide a custom clock.
122+
*
123+
* @return The time in nanoseconds.
124+
*/
125+
private long nanoTime()
126+
{
127+
return nanoTimeSupplier.getAsLong();
90128
}
91129

92130
/**
@@ -112,30 +150,54 @@ public synchronized boolean tryAcquire()
112150
}
113151

114152
/**
115-
* Refills tokens based on elapsed time.
153+
* Adds tokens based on elapsed time.
116154
*/
117155
private void refillIfNeeded()
118156
{
119-
final long now = System.nanoTime();
157+
final long now = nanoTime();
120158
final long elapsed = now - lastRefillTimeNanos;
159+
// With very small intervals, no time may have passed yet.
121160
if (elapsed <= 0) {
122161
return;
123162
}
124163

125-
// If the multiplication would overflow, elapsed time is so large that the bucket would be completely refilled
126-
// regardless, so cap directly at capacity.
127-
final long tokensToAdd;
128-
if (elapsed > Long.MAX_VALUE / refillTokensPerSecond) {
129-
tokensToAdd = capacity;
130-
} else {
131-
tokensToAdd = Math.min(capacity, (elapsed * refillTokensPerSecond) / 1_000_000_000L);
164+
if (availableTokens >= capacity) {
165+
// When already full, do not store extra time as hidden credit.
166+
lastRefillTimeNanos = now;
167+
refillRemainder = 0;
168+
return;
132169
}
133170

134-
if (tokensToAdd > 0) {
135-
availableTokens = tokensToAdd >= capacity - availableTokens ? capacity : availableTokens + tokensToAdd;
136-
// Only advance the timestamp when tokens are actually added, so that sub-token elapsed time is preserved
137-
// and contributes to the next refill rather than being discarded.
171+
final long remainingCapacity = capacity - availableTokens;
172+
173+
// Refill is calculated in scaled integer units: (elapsed * rate) + previous leftover.
174+
// If this overflows, elapsed time is so large that the bucket must be full.
175+
if (elapsed > (Long.MAX_VALUE - refillRemainder) / refillTokensPerSecond) {
176+
availableTokens = capacity;
177+
lastRefillTimeNanos = now;
178+
refillRemainder = 0;
179+
return;
180+
}
181+
182+
// Convert elapsed time to scaled units (1_000_000_000 units = 1 token).
183+
final long refillUnits = elapsed * refillTokensPerSecond + refillRemainder;
184+
final long tokensToGenerate = refillUnits / 1_000_000_000L;
185+
// Keep leftover units until they become at least one full token.
186+
if (tokensToGenerate <= 0) {
187+
return;
188+
}
189+
190+
final long tokensToAdd = Math.min(remainingCapacity, tokensToGenerate);
191+
availableTokens += tokensToAdd;
192+
193+
if (availableTokens >= capacity) {
194+
// Once capacity is reached, drop any extra accrued value.
195+
lastRefillTimeNanos = now;
196+
refillRemainder = 0;
197+
} else {
198+
// Keep the leftover fraction so refill speed stays accurate over time.
138199
lastRefillTimeNanos = now;
200+
refillRemainder = refillUnits % 1_000_000_000L;
139201
}
140202
}
141203

@@ -220,7 +282,7 @@ public synchronized double getAcceptanceRatio() {
220282
* @return uptime duration
221283
*/
222284
public Duration getUptime() {
223-
return Duration.ofNanos(System.nanoTime() - startTimeNanos);
285+
return Duration.ofNanos(nanoTime() - startTimeNanos);
224286
}
225287

226288
/**

xmppserver/src/test/java/org/jivesoftware/util/TokenBucketRateLimiterTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import org.junit.jupiter.api.Test;
1919

2020
import java.time.Duration;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
import java.util.function.LongSupplier;
2123

2224
import static org.junit.jupiter.api.Assertions.*;
2325

@@ -154,6 +156,48 @@ void testGetAvailableTokensReflectsRefill() throws InterruptedException
154156
assertEquals(1, limiter.getAvailableTokens(), "getAvailableTokens should reflect tokens added by refill");
155157
}
156158

159+
/**
160+
* Verifies that sub-token elapsed time is preserved and contributes to a later refill.
161+
*/
162+
@Test
163+
void testFractionalElapsedTimeIsPreservedAcrossRefills()
164+
{
165+
// Setup test fixture.
166+
final FakeNanoClock clock = new FakeNanoClock();
167+
final TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(1, 2, clock);
168+
169+
// Execute system under test.
170+
assertTrue(limiter.tryAcquire(), "First acquire should consume one of the initial burst tokens");
171+
assertTrue(limiter.tryAcquire(), "Second acquire should consume the second initial burst token");
172+
assertFalse(limiter.tryAcquire(), "Bucket should now be exhausted");
173+
174+
clock.advanceNanos(1_500_000_000L);
175+
assertTrue(limiter.tryAcquire(), "1.5 seconds should refill exactly one token");
176+
assertFalse(limiter.tryAcquire(), "Only one token should have been refilled so far");
177+
178+
clock.advanceNanos(500_000_000L);
179+
assertTrue(limiter.tryAcquire(), "Remaining 0.5 second should combine with prior remainder to refill one token");
180+
}
181+
182+
/**
183+
* Verifies that elapsed time while the bucket is full does not become hidden credit.
184+
*/
185+
@Test
186+
void testElapsedTimeIsNotBankedWhileBucketIsFull()
187+
{
188+
// Setup test fixture.
189+
final FakeNanoClock clock = new FakeNanoClock();
190+
final TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(1, 2, clock);
191+
192+
// Execute system under test.
193+
clock.advanceNanos(5_000_000_000L);
194+
assertEquals(2, limiter.getAvailableTokens(), "Bucket should remain capped at burst capacity");
195+
196+
assertTrue(limiter.tryAcquire(), "One token should be immediately available");
197+
assertTrue(limiter.tryAcquire(), "Second token should be immediately available");
198+
assertFalse(limiter.tryAcquire(), "No hidden refill credit should exist after draining a full bucket");
199+
}
200+
157201
/**
158202
* Verifies that the limiter correctly counts accepted and rejected events.
159203
*/
@@ -360,4 +404,23 @@ void testUnlimitedLimiterInstancesAreIndependent()
360404
assertEquals(3, limiter1.getAcceptedEvents(), "Limiter1 should count only its own accepted events");
361405
assertEquals(0, limiter2.getAcceptedEvents(), "Limiter2 should not be affected by events on limiter1");
362406
}
407+
408+
/**
409+
* A fake {@link System#nanoTime()} implementation that allows for manual increments of time.
410+
*/
411+
private static final class FakeNanoClock implements LongSupplier
412+
{
413+
private final AtomicLong nowNanos = new AtomicLong();
414+
415+
@Override
416+
public long getAsLong()
417+
{
418+
return nowNanos.get();
419+
}
420+
421+
void advanceNanos(final long nanos)
422+
{
423+
nowNanos.addAndGet(nanos);
424+
}
425+
}
363426
}

0 commit comments

Comments
 (0)