Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.metrics;

import static java.lang.Math.exp;
import static java.lang.Math.expm1;

/**
* Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to
* some sort of gauge or counter which gives a value for the rate at which the gauge is being incremented where the weight given to an
* increment decreases exponentially with how long ago it happened.
*
* <p>Definitions: The <i>rate</i> of increase of the gauge or counter over an interval is the sum of the increments which occurred during
* the interval, divided by the length of the interval. The <i>weighted</i> rate of increase is the sum of the increments with each
* multiplied by a weight which is a function of how long ago it happened, divided by the integral of the weight function over the interval.
* The <i>exponentially</i> weighted rate is the weighted rate when the weight function is given by {@code exp(-1.0 * lambda * time)} where
* {@code lambda} is a constant and {@code time} specifies how long ago the increment happened. A <i>moving</i> rate is simply a rate
* calculated for an every-growing series of increments (typically by updating the previous rate rather than recalculating from scratch).
*
* <p>This class is thread-safe.
*/
public class ExponentiallyWeightedMovingRate {

// The maths behind this is explained in section 2 of this document: https://github.com/user-attachments/files/19166625/ewma.pdf

// This implementation uses synchronization to provide thread safety. The synchronized code is all non-blocking, and just performs a
// fixed small number of floating point operations plus some memory reads and writes. If they take somewhere in the region of 10ns each,
// we can do up to tens of millions of QPS before the lock risks becoming a bottleneck.

private final double lambda;
private final long startTimeInMillis;
private double rate;
long lastTimeInMillis;

/**
* Constructor.
*
* @param lambda A parameter which dictates how quickly the average "forgets" older increments. The weight given to an increment which
* happened a time {@code timeAgo} milliseconds ago will be proportional to {@code exp(-1.0 * lambda * timeAgo)}. The half-life
* (measured in milliseconds) is related to this parameter by the equation {@code exp(-1.0 * lambda * halfLife)} = 0.5}, so
* {@code lambda = log(2.0) / halfLife)}. This may be zero, but must not be negative.
* @param startTimeInMillis The time, in milliseconds since the epoch, to consider the start time for the rate calculation. This must be
* greater than zero.
*/
public ExponentiallyWeightedMovingRate(double lambda, long startTimeInMillis) {
if (lambda < 0.0) {
throw new IllegalArgumentException("lambda must be non-negative but was " + lambda);
}
if (startTimeInMillis <= 0.0) {
throw new IllegalArgumentException("lambda must be non-negative but was " + startTimeInMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startTime must be ....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, well spotted.

}
synchronized (this) {
this.lambda = lambda;
this.rate = Double.NaN; // should never be used
this.startTimeInMillis = startTimeInMillis;
this.lastTimeInMillis = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first
}
}

/**
* Returns the EWMR at the given time, in millis since the epoch.
*
* <p>If there have been no increments yet, this returns zero.
*
* <p>Otherwise, we require the time to be no earlier than the time of the previous increment, i.e. the value of {@code timeInMillis}
* for this call must not be less than the value of {@code timeInMillis} for the last call to {@link #addIncrement}. If this is not the
* case, the method behaves as if it had that minimum value.
*/
public double getRate(long timeInMillis) {
synchronized (this) {
if (lastTimeInMillis == 0) { // indicates that no increment has happened yet
return 0.0;
} else if (timeInMillis <= lastTimeInMillis) {
return rate;
} else {
// This is the formula for R(t) given in subsection 2.6 of the document referenced above:
return expHelper(lastTimeInMillis - startTimeInMillis) * exp(-1.0 * lambda * (timeInMillis - lastTimeInMillis)) * rate
/ expHelper(timeInMillis - startTimeInMillis);
}
}
}

/**
* Given the EWMR {@code currentRate} at time {@code currentTimeMillis} and the EWMR {@code oldRate} at time {@code oldTimeMillis},
* returns the EWMR that would be calculated at {@code currentTimeMillis} if the start time was {@code oldTimeMillis} rather than the
* {@code startTimeMillis} passed to the parameter. This rate incorporates all the increments that contributed to {@code currentRate}
* but not to {@code oldRate}. The increments that contributed to {@code oldRate} are effectively 'forgotten'. All times are in millis
* since the epoch.
*
* <p>Normally, {@code currentTimeMillis} should be after {@code oldTimeMillis}. If it is not, this method returns zero.
*
* <p>Note that this method does <i>not</i> depend on any of the increments made to this {@link ExponentiallyWeightedMovingRate}
* instance, or on its {@code startTimeMillis}. It is only non-static because it uses this instance's {@code lambda}.
*/
public double calculateRateSince(long currentTimeMillis, double currentRate, long oldTimeMillis, double oldRate) {
if (currentTimeMillis <= oldTimeMillis) {
return 0.0;
}
// This is the formula for R'(t, T) given in subsection 2.7 of the document referenced above:
return (expHelper(currentTimeMillis - startTimeInMillis) * currentRate - expHelper(oldTimeMillis - startTimeInMillis) * exp(
-1.0 * lambda * (currentTimeMillis - oldTimeMillis)
) * oldRate) / expHelper(currentTimeMillis - oldTimeMillis);
}

/**
* Updates the rate to reflect that the gauge has been incremented by an amount {@code increment} at a time {@code timeInMillis} in
* milliseconds since the epoch.
*
* <p>If this is the first increment, we require it to occur after the start time for the rate calculation, i.e. the value of
* {@code timeInMillis} must be greater than {@code startTimeInMillis} passed to the constructor. If this is not the case, the method
* behaves as if {@code timeInMillis} is {@code startTimeInMillis + 1} to prevent a division by zero error.
*
* <p>If this is not the first increment, we require it not to occur before the previous increment, i.e. the value of
* {@code timeInMillis} for this call must be greater than or equal to the value for the previous call. If this is not the case, the
* method behaves as if this call's {@code timeInMillis} is the same as the previous call's.
*/
public void addIncrement(double increment, long timeInMillis) {
synchronized (this) {
if (lastTimeInMillis == 0) { // indicates that this is the first increment
if (timeInMillis <= startTimeInMillis) {
timeInMillis = startTimeInMillis + 1;
}
// This is the formula for R(t_1) given in subsection 2.6 of the document referenced above:
rate = increment / expHelper(timeInMillis - startTimeInMillis);
} else {
if (timeInMillis < lastTimeInMillis) {
timeInMillis = lastTimeInMillis;
}
// This is the formula for R(t_j+1) given in subsection 2.6 of the document referenced above:
rate += (increment - expHelper(timeInMillis - lastTimeInMillis) * rate) / expHelper(timeInMillis - startTimeInMillis);
}
lastTimeInMillis = timeInMillis;
}
}

/**
* Returns something mathematically equivalent to {@code (1.0 - exp(-1.0 * lambda * time)) / lambda}, using an implementation which
* should not be subject to numerical instability when {@code lambda * time} is small. Returns {@code time} when {@code lambda = 0},
* which is the correct limit.
*/
private double expHelper(double time) {
// This is the function E(lambda, t) defined in subsection 2.6 of the document referenced above, and the calculation follows the
// principles discussed there:
assert time >= 0.0;
double lambdaTime = lambda * time;
if (lambdaTime >= 1.0e-2) {
// The direct calculation should be fine here:
return (1.0 - exp(-1.0 * lambdaTime)) / lambda;
} else if (lambdaTime >= 1.0e-10) {
// Avoid taking the small difference of two similar quantities by using expm1 here:
return -1.0 * expm1(-1.0 * lambdaTime) / lambda;
} else {
// Approximate exp(-1.0 * lambdaTime) = 1.0 - lambdaTime + 0.5 * lambdaTime * lambdaTime here (also works for lambda = 0):
return time * (1.0 - 0.5 * lambdaTime);
}
}
}
Loading