Skip to content

Commit cef9070

Browse files
committed
[improve][broker] Optimize AsyncTokenBucket overflow solution further to reduce fallback to BigInteger (#25269)
(cherry picked from commit 4cbe124)
1 parent b2ca58a commit cef9070

File tree

2 files changed

+93
-39
lines changed

2 files changed

+93
-39
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java

Lines changed: 91 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ public abstract class AsyncTokenBucket {
103103
* which has a complex solution to prevent the CAS loop content problem.
104104
*/
105105
private final LongAdder pendingConsumedTokens = new LongAdder();
106+
/**
107+
* Cached pre-reduced rate parameters. Invalidated whenever {@link #getRate()} or
108+
* {@link #getRatePeriodNanos()} returns a different value (relevant for dynamic-rate buckets).
109+
*/
110+
private volatile RateParameters rateParameters;
106111

107112
protected AsyncTokenBucket(MonotonicClock clockSource, long addTokensResolutionNanos) {
108113
this.clockSource = clockSource;
@@ -122,6 +127,18 @@ public static DynamicRateAsyncTokenBucketBuilder builderForDynamicRate() {
122127

123128
protected abstract long getTargetAmountOfTokensAfterThrottling();
124129

130+
private RateParameters resolveRateParameters() {
131+
long rate = getRate();
132+
long ratePeriodNanos = getRatePeriodNanos();
133+
RateParameters current = rateParameters;
134+
if (current != null && current.rate == rate && current.ratePeriodNanos == ratePeriodNanos) {
135+
return current;
136+
}
137+
RateParameters updated = new RateParameters(rate, ratePeriodNanos);
138+
rateParameters = updated;
139+
return updated;
140+
}
141+
125142
/**
126143
* Consumes tokens and possibly updates the token balance. New tokens are calculated if the last new token
127144
* calculation occurred more than addTokensResolutionNanos nanoseconds ago. When new tokens are added, the
@@ -203,13 +220,12 @@ private long calculateNewTokensSinceLastUpdate(long currentNanos) {
203220
newTokens = 0;
204221
} else {
205222
long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
206-
long currentRate = getRate();
207-
long currentRatePeriodNanos = getRatePeriodNanos();
223+
RateParameters rp = resolveRateParameters();
208224
// new tokens is the amount of tokens that are created in the duration since the last update
209225
// with the configured rate
210-
newTokens = safeMulDivFloor(durationNanos, currentRate, currentRatePeriodNanos);
226+
newTokens = rp.calculateTokens(durationNanos);
211227
// carry forward the remainder nanos so that the rounding error is eliminated
212-
long consumedNanos = safeMulDivFloor(newTokens, currentRatePeriodNanos, currentRate);
228+
long consumedNanos = rp.calculateDuration(newTokens);
213229
long remainderNanos = durationNanos >= consumedNanos ? durationNanos - consumedNanos : 0;
214230
if (remainderNanos > 0) {
215231
REMAINDER_NANOS_UPDATER.addAndGet(this, remainderNanos);
@@ -277,41 +293,8 @@ public long calculateThrottlingDuration(long requiredTokens) {
277293
} catch (ArithmeticException e) {
278294
needTokens = Long.MAX_VALUE;
279295
}
280-
return safeMulDivFloor(needTokens, getRatePeriodNanos(), getRate());
281-
}
282-
283-
private static long safeMulDivFloor(long multiplicand, long multiplier, long divisor) {
284-
if (multiplicand < 0 || multiplier < 0) {
285-
throw new IllegalArgumentException("multiplicand and multiplier must be >= 0");
286-
}
287-
if (divisor <= 0) {
288-
throw new IllegalArgumentException("divisor must be > 0");
289-
}
290-
if (multiplicand == 0 || multiplier == 0) {
291-
return 0;
292-
}
293-
// Fast path
294-
// Check if multiplication fits in a 64-bit value
295-
// Math.multiplyHigh is intrinsified by the JVM (single mulq/mul instruction),
296-
// avoiding the cost of a division-based overflow check.
297-
// It returns the upper 64 bits of the full 128-bit multiplication result.
298-
// When the result is 0, the product fits in 64 bits.
299-
if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
300-
long product = multiplicand * multiplier;
301-
if (product >= 0) {
302-
// product fits in signed 64-bit
303-
return product / divisor;
304-
}
305-
// product is in [2^63, 2^64): fits unsigned but not signed
306-
long result = Long.divideUnsigned(product, divisor);
307-
// cap at Long.MAX_VALUE if result itself overflows signed long
308-
return result >= 0 ? result : Long.MAX_VALUE;
309-
}
310-
// Fallback to BigInteger division
311-
BigInteger result = BigInteger.valueOf(multiplicand)
312-
.multiply(BigInteger.valueOf(multiplier))
313-
.divide(BigInteger.valueOf(divisor));
314-
return result.bitLength() < Long.SIZE ? result.longValue() : Long.MAX_VALUE;
296+
RateParameters rp = resolveRateParameters();
297+
return rp.calculateDuration(needTokens);
315298
}
316299

317300
/**
@@ -342,4 +325,73 @@ public boolean containsTokens() {
342325
return tokens() > 0;
343326
}
344327

328+
/**
329+
* Holds pre-computed rate parameters where {@code rate} and {@code ratePeriodNanos} have been
330+
* divided by their highest common power of ten. This reduction keeps the operands smaller and
331+
* avoids overflow in {@link #safeMulDivFloor(long, long, long)} without changing the result of
332+
* any integer floor-division (dividing numerator and denominator by the same factor preserves
333+
* the quotient). The instance is cached and reused as long as the rate and period are unchanged.
334+
*/
335+
static final class RateParameters {
336+
final long rate;
337+
final long ratePeriodNanos;
338+
final long reducedRate;
339+
final long reducedRatePeriod;
340+
341+
RateParameters(long rate, long ratePeriodNanos) {
342+
this.rate = rate;
343+
this.ratePeriodNanos = ratePeriodNanos;
344+
long r = rate;
345+
long p = ratePeriodNanos;
346+
while (r % 10 == 0 && p % 10 == 0) {
347+
r /= 10;
348+
p /= 10;
349+
}
350+
this.reducedRate = r;
351+
this.reducedRatePeriod = p;
352+
}
353+
354+
public long calculateTokens(long durationNanos) {
355+
return safeMulDivFloor(durationNanos, reducedRate, reducedRatePeriod);
356+
}
357+
358+
public long calculateDuration(long tokens) {
359+
return safeMulDivFloor(tokens, reducedRatePeriod, reducedRate);
360+
}
361+
362+
private static long safeMulDivFloor(long multiplicand, long multiplier, long divisor) {
363+
if (multiplicand < 0 || multiplier < 0) {
364+
throw new IllegalArgumentException("multiplicand and multiplier must be >= 0");
365+
}
366+
if (divisor <= 0) {
367+
throw new IllegalArgumentException("divisor must be > 0");
368+
}
369+
if (multiplicand == 0 || multiplier == 0) {
370+
return 0;
371+
}
372+
// Fast path
373+
// Check if multiplication fits in a 64-bit value
374+
// Math.multiplyHigh is intrinsified by the JVM (single mulq/mul instruction),
375+
// avoiding the cost of a division-based overflow check.
376+
// It returns the upper 64 bits of the full 128-bit multiplication result.
377+
// When the result is 0, the product fits in 64 bits.
378+
if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
379+
long product = multiplicand * multiplier;
380+
if (product >= 0) {
381+
// product fits in signed 64-bit
382+
return product / divisor;
383+
}
384+
// product is in [2^63, 2^64): fits unsigned but not signed
385+
long result = Long.divideUnsigned(product, divisor);
386+
// cap at Long.MAX_VALUE if result itself overflows signed long
387+
return result >= 0 ? result : Long.MAX_VALUE;
388+
}
389+
// Fallback to BigInteger division
390+
BigInteger result = BigInteger.valueOf(multiplicand)
391+
.multiply(BigInteger.valueOf(multiplier))
392+
.divide(BigInteger.valueOf(divisor));
393+
return result.bitLength() < Long.SIZE ? result.longValue() : Long.MAX_VALUE;
394+
}
395+
}
396+
345397
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ public Object[][] largeRates() {
205205
{1_000_000_000L},
206206
{1_500_000_000L},
207207
{2_000_000_000L},
208+
{100_000_000_000L},
209+
{Long.MAX_VALUE / 1_000_000_000L * 1_000_000_000L},
208210
{Long.MAX_VALUE / 100L},
209211
{Long.MAX_VALUE / 10L},
210212
{Long.MAX_VALUE / 9L},

0 commit comments

Comments
 (0)