Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public abstract class AsyncTokenBucket {
* which has a complex solution to prevent the CAS loop content problem.
*/
private final LongAdder pendingConsumedTokens = new LongAdder();
/**
* Cached pre-reduced rate parameters. Invalidated whenever {@link #getRate()} or
* {@link #getRatePeriodNanos()} returns a different value (relevant for dynamic-rate buckets).
*/
private volatile RateParameters rateParameters;

protected AsyncTokenBucket(MonotonicClock clockSource, long addTokensResolutionNanos) {
this.clockSource = clockSource;
Expand All @@ -122,6 +127,18 @@ public static DynamicRateAsyncTokenBucketBuilder builderForDynamicRate() {

protected abstract long getTargetAmountOfTokensAfterThrottling();

private RateParameters resolveRateParameters() {
long rate = getRate();
long ratePeriodNanos = getRatePeriodNanos();
RateParameters current = rateParameters;
if (current != null && current.rate == rate && current.ratePeriodNanos == ratePeriodNanos) {
return current;
}
RateParameters updated = new RateParameters(rate, ratePeriodNanos);
rateParameters = updated;
return updated;
}

/**
* Consumes tokens and possibly updates the token balance. New tokens are calculated if the last new token
* calculation occurred more than addTokensResolutionNanos nanoseconds ago. When new tokens are added, the
Expand Down Expand Up @@ -203,13 +220,12 @@ private long calculateNewTokensSinceLastUpdate(long currentNanos) {
newTokens = 0;
} else {
long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
long currentRate = getRate();
long currentRatePeriodNanos = getRatePeriodNanos();
RateParameters rp = resolveRateParameters();
// new tokens is the amount of tokens that are created in the duration since the last update
// with the configured rate
newTokens = safeMulDivFloor(durationNanos, currentRate, currentRatePeriodNanos);
newTokens = rp.calculateTokens(durationNanos);
// carry forward the remainder nanos so that the rounding error is eliminated
long consumedNanos = safeMulDivFloor(newTokens, currentRatePeriodNanos, currentRate);
long consumedNanos = rp.calculateDuration(newTokens);
long remainderNanos = durationNanos >= consumedNanos ? durationNanos - consumedNanos : 0;
if (remainderNanos > 0) {
REMAINDER_NANOS_UPDATER.addAndGet(this, remainderNanos);
Expand Down Expand Up @@ -277,41 +293,8 @@ public long calculateThrottlingDuration(long requiredTokens) {
} catch (ArithmeticException e) {
needTokens = Long.MAX_VALUE;
}
return safeMulDivFloor(needTokens, getRatePeriodNanos(), getRate());
}

private static long safeMulDivFloor(long multiplicand, long multiplier, long divisor) {
if (multiplicand < 0 || multiplier < 0) {
throw new IllegalArgumentException("multiplicand and multiplier must be >= 0");
}
if (divisor <= 0) {
throw new IllegalArgumentException("divisor must be > 0");
}
if (multiplicand == 0 || multiplier == 0) {
return 0;
}
// Fast path
// Check if multiplication fits in a 64-bit value
// Math.multiplyHigh is intrinsified by the JVM (single mulq/mul instruction),
// avoiding the cost of a division-based overflow check.
// It returns the upper 64 bits of the full 128-bit multiplication result.
// When the result is 0, the product fits in 64 bits.
if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
long product = multiplicand * multiplier;
if (product >= 0) {
// product fits in signed 64-bit
return product / divisor;
}
// product is in [2^63, 2^64): fits unsigned but not signed
long result = Long.divideUnsigned(product, divisor);
// cap at Long.MAX_VALUE if result itself overflows signed long
return result >= 0 ? result : Long.MAX_VALUE;
}
// Fallback to BigInteger division
BigInteger result = BigInteger.valueOf(multiplicand)
.multiply(BigInteger.valueOf(multiplier))
.divide(BigInteger.valueOf(divisor));
return result.bitLength() < Long.SIZE ? result.longValue() : Long.MAX_VALUE;
RateParameters rp = resolveRateParameters();
return rp.calculateDuration(needTokens);
}

/**
Expand Down Expand Up @@ -342,4 +325,73 @@ public boolean containsTokens() {
return tokens() > 0;
}

/**
* Holds pre-computed rate parameters where {@code rate} and {@code ratePeriodNanos} have been
* divided by their highest common power of ten. This reduction keeps the operands smaller and
* avoids overflow in {@link #safeMulDivFloor(long, long, long)} without changing the result of
* any integer floor-division (dividing numerator and denominator by the same factor preserves
* the quotient). The instance is cached and reused as long as the rate and period are unchanged.
*/
static final class RateParameters {
final long rate;
final long ratePeriodNanos;
final long reducedRate;
final long reducedRatePeriod;

RateParameters(long rate, long ratePeriodNanos) {
this.rate = rate;
this.ratePeriodNanos = ratePeriodNanos;
long r = rate;
long p = ratePeriodNanos;
while (r % 10 == 0 && p % 10 == 0) {
r /= 10;
p /= 10;
}
this.reducedRate = r;
this.reducedRatePeriod = p;
}

public long calculateTokens(long durationNanos) {
return safeMulDivFloor(durationNanos, reducedRate, reducedRatePeriod);
}

public long calculateDuration(long tokens) {
return safeMulDivFloor(tokens, reducedRatePeriod, reducedRate);
}

private static long safeMulDivFloor(long multiplicand, long multiplier, long divisor) {
if (multiplicand < 0 || multiplier < 0) {
throw new IllegalArgumentException("multiplicand and multiplier must be >= 0");
}
if (divisor <= 0) {
throw new IllegalArgumentException("divisor must be > 0");
}
if (multiplicand == 0 || multiplier == 0) {
return 0;
}
// Fast path
// Check if multiplication fits in a 64-bit value
// Math.multiplyHigh is intrinsified by the JVM (single mulq/mul instruction),
// avoiding the cost of a division-based overflow check.
// It returns the upper 64 bits of the full 128-bit multiplication result.
// When the result is 0, the product fits in 64 bits.
if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
long product = multiplicand * multiplier;
if (product >= 0) {
// product fits in signed 64-bit
return product / divisor;
}
// product is in [2^63, 2^64): fits unsigned but not signed
long result = Long.divideUnsigned(product, divisor);
// cap at Long.MAX_VALUE if result itself overflows signed long
return result >= 0 ? result : Long.MAX_VALUE;
}
// Fallback to BigInteger division
BigInteger result = BigInteger.valueOf(multiplicand)
.multiply(BigInteger.valueOf(multiplier))
.divide(BigInteger.valueOf(divisor));
return result.bitLength() < Long.SIZE ? result.longValue() : Long.MAX_VALUE;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public Object[][] largeRates() {
{1_000_000_000L},
{1_500_000_000L},
{2_000_000_000L},
{100_000_000_000L},
{Long.MAX_VALUE / 1_000_000_000L * 1_000_000_000L},
{Long.MAX_VALUE / 100L},
{Long.MAX_VALUE / 10L},
{Long.MAX_VALUE / 9L},
Expand Down
Loading