|
| 1 | +/* |
| 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one |
| 3 | + * or more contributor license agreements. Licensed under the "Elastic License |
| 4 | + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side |
| 5 | + * Public License v 1"; you may not use this file except in compliance with, at |
| 6 | + * your election, the "Elastic License 2.0", the "GNU Affero General Public |
| 7 | + * License v3.0 only", or the "Server Side Public License, v 1". |
| 8 | + */ |
| 9 | + |
| 10 | +package org.elasticsearch.common.metrics; |
| 11 | + |
| 12 | +import static java.lang.Math.exp; |
| 13 | +import static java.lang.Math.expm1; |
| 14 | + |
| 15 | +/** |
| 16 | + * Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to |
| 17 | + * some sort of gauge or counter which gives a value for the rate at which the gauge is being incremented where the weight given to an |
| 18 | + * increment decreases exponentially with how long ago it happened. |
| 19 | + * |
| 20 | + * <p>Definitions: The <i>rate</i> of increase of the gauge or counter over an interval is the sum of the increments which occurred during |
| 21 | + * the interval, divided by the length of the interval. The <i>weighted</i> rate of increase is the sum of the increments with each |
| 22 | + * multiplied by a weight which is a function of how long ago it happened, divided by the integral of the weight function over the interval. |
| 23 | + * The <i>exponentially</i> weighted rate is the weighted rate when the weight function is given by {@code exp(-1.0 * lambda * time)} where |
| 24 | + * {@code lambda} is a constant and {@code time} specifies how long ago the increment happened. A <i>moving</i> rate is simply a rate |
| 25 | + * calculated for an every-growing series of increments (typically by updating the previous rate rather than recalculating from scratch). |
| 26 | + * |
| 27 | + * <p>The times involved in the API of this class can use the caller's choice of units and origin, e.g. they can be millis since the |
| 28 | + * epoch as returned by {@link System#currentTimeMillis}, nanos since an arbitrary origin as returned by {@link System#nanoTime}, or |
| 29 | + * anything else. The only requirement is that the same convention must be used consistently. |
| 30 | + * |
| 31 | + * <p>This class is thread-safe. |
| 32 | + */ |
| 33 | +public class ExponentiallyWeightedMovingRate { |
| 34 | + |
| 35 | + // The maths behind this is explained in section 2 of this document: https://github.com/user-attachments/files/19166625/ewma.pdf |
| 36 | + |
| 37 | + // This implementation uses synchronization to provide thread safety. The synchronized code is all non-blocking, and just performs a |
| 38 | + // fixed small number of floating point operations plus some memory reads and writes. If they take somewhere in the region of 10ns each, |
| 39 | + // we can do up to tens of millions of QPS before the lock risks becoming a bottleneck. |
| 40 | + |
| 41 | + private final double lambda; |
| 42 | + private final long startTime; |
| 43 | + private double rate; |
| 44 | + long lastTime; |
| 45 | + |
| 46 | + /** |
| 47 | + * Constructor. |
| 48 | + * |
| 49 | + * @param lambda A parameter which dictates how quickly the average "forgets" older increments. The weight given to an increment which |
| 50 | + * happened time {@code timeAgo} ago will be proportional to {@code exp(-1.0 * lambda * timeAgo)}. The half-life is related to this |
| 51 | + * parameter by the equation {@code exp(-1.0 * lambda * halfLife)} = 0.5}, so {@code lambda = log(2.0) / halfLife)}. This may be |
| 52 | + * zero, but must not be negative. The units of this value are the inverse of the units being used for time. |
| 53 | + * @param startTime The time to consider the start time for the rate calculation. This must be greater than zero. The units and origin |
| 54 | + * of this value must match all other calls to this instance. |
| 55 | + */ |
| 56 | + public ExponentiallyWeightedMovingRate(double lambda, long startTime) { |
| 57 | + if (lambda < 0.0) { |
| 58 | + throw new IllegalArgumentException("lambda must be non-negative but was " + lambda); |
| 59 | + } |
| 60 | + if (startTime <= 0.0) { |
| 61 | + throw new IllegalArgumentException("startTime must be non-negative but was " + startTime); |
| 62 | + } |
| 63 | + synchronized (this) { |
| 64 | + this.lambda = lambda; |
| 65 | + this.rate = Double.NaN; // should never be used |
| 66 | + this.startTime = startTime; |
| 67 | + this.lastTime = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + /** |
| 72 | + * Returns the EWMR at the given time. The units and origin of this time must match all other calls to this instance. The units |
| 73 | + * of the returned rate are the ratio of the increment units to the time units as used for {@link #addIncrement}. |
| 74 | + * |
| 75 | + * <p>If there have been no increments yet, this returns zero. |
| 76 | + * |
| 77 | + * <p>Otherwise, we require the time to be no earlier than the time of the previous increment, i.e. the value of {@code time} |
| 78 | + * for this call must not be less than the value of {@code time} for the last call to {@link #addIncrement}. If this is not the |
| 79 | + * case, the method behaves as if it had that minimum value. |
| 80 | + */ |
| 81 | + public double getRate(long time) { |
| 82 | + synchronized (this) { |
| 83 | + if (lastTime == 0) { // indicates that no increment has happened yet |
| 84 | + return 0.0; |
| 85 | + } else if (time <= lastTime) { |
| 86 | + return rate; |
| 87 | + } else { |
| 88 | + // This is the formula for R(t) given in subsection 2.6 of the document referenced above: |
| 89 | + return expHelper(lastTime - startTime) * exp(-1.0 * lambda * (time - lastTime)) * rate / expHelper(time - startTime); |
| 90 | + } |
| 91 | + } |
| 92 | + } |
| 93 | + |
| 94 | + /** |
| 95 | + * Given the EWMR {@code currentRate} at time {@code currentTime} and the EWMR {@code oldRate} at time {@code oldTime}, returns the EWMR |
| 96 | + * that would be calculated at {@code currentTime} if the start time was {@code oldTime} rather than the {@code startTime} passed to the |
| 97 | + * parameter. This rate incorporates all the increments that contributed to {@code currentRate} but not to {@code oldRate}. The |
| 98 | + * increments that contributed to {@code oldRate} are effectively 'forgotten'. The units and origin of the times and rates must match |
| 99 | + * all other calls to this instance. |
| 100 | + * |
| 101 | + * <p>Normally, {@code currentTime} should be after {@code oldTime}. If it is not, this method returns zero. |
| 102 | + * |
| 103 | + * <p>Note that this method does <i>not</i> depend on any of the increments made to this {@link ExponentiallyWeightedMovingRate} |
| 104 | + * instance. It is only non-static because it uses this instance's {@code lambda} and {@code startTime}. |
| 105 | + */ |
| 106 | + public double calculateRateSince(long currentTime, double currentRate, long oldTime, double oldRate) { |
| 107 | + if (currentTime <= oldTime) { |
| 108 | + return 0.0; |
| 109 | + } |
| 110 | + // This is the formula for R'(t, T) given in subsection 2.7 of the document referenced above: |
| 111 | + return (expHelper(currentTime - startTime) * currentRate - expHelper(oldTime - startTime) * exp( |
| 112 | + -1.0 * lambda * (currentTime - oldTime) |
| 113 | + ) * oldRate) / expHelper(currentTime - oldTime); |
| 114 | + } |
| 115 | + |
| 116 | + /** |
| 117 | + * Updates the rate to reflect that the gauge has been incremented by an amount {@code increment} at a time {@code time}. The unit and |
| 118 | + * offset of the time must match all other calls to this instance. The units of the increment are arbitrary but must also be consistent. |
| 119 | + * |
| 120 | + * <p>If this is the first increment, we require it to occur after the start time for the rate calculation, i.e. the value of |
| 121 | + * {@code time} must be greater than {@code startTime} passed to the constructor. If this is not the case, the method behaves as if |
| 122 | + * {@code time} is {@code startTime + 1} to prevent a division by zero error. |
| 123 | + * |
| 124 | + * <p>If this is not the first increment, we require it not to occur before the previous increment, i.e. the value of {@code time} for |
| 125 | + * this call must be greater than or equal to the value for the previous call. If this is not the case, the method behaves as if this |
| 126 | + * call's {@code time} is the same as the previous call's. |
| 127 | + */ |
| 128 | + public void addIncrement(double increment, long time) { |
| 129 | + synchronized (this) { |
| 130 | + if (lastTime == 0) { // indicates that this is the first increment |
| 131 | + if (time <= startTime) { |
| 132 | + time = startTime + 1; |
| 133 | + } |
| 134 | + // This is the formula for R(t_1) given in subsection 2.6 of the document referenced above: |
| 135 | + rate = increment / expHelper(time - startTime); |
| 136 | + } else { |
| 137 | + if (time < lastTime) { |
| 138 | + time = lastTime; |
| 139 | + } |
| 140 | + // This is the formula for R(t_j+1) given in subsection 2.6 of the document referenced above: |
| 141 | + rate += (increment - expHelper(time - lastTime) * rate) / expHelper(time - startTime); |
| 142 | + } |
| 143 | + lastTime = time; |
| 144 | + } |
| 145 | + } |
| 146 | + |
| 147 | + /** |
| 148 | + * Returns something mathematically equivalent to {@code (1.0 - exp(-1.0 * lambda * time)) / lambda}, using an implementation which |
| 149 | + * should not be subject to numerical instability when {@code lambda * time} is small. Returns {@code time} when {@code lambda = 0}, |
| 150 | + * which is the correct limit. |
| 151 | + */ |
| 152 | + private double expHelper(double time) { |
| 153 | + // This is the function E(lambda, t) defined in subsection 2.6 of the document referenced above, and the calculation follows the |
| 154 | + // principles discussed there: |
| 155 | + assert time >= 0.0; |
| 156 | + double lambdaTime = lambda * time; |
| 157 | + if (lambdaTime >= 1.0e-2) { |
| 158 | + // The direct calculation should be fine here: |
| 159 | + return (1.0 - exp(-1.0 * lambdaTime)) / lambda; |
| 160 | + } else if (lambdaTime >= 1.0e-10) { |
| 161 | + // Avoid taking the small difference of two similar quantities by using expm1 here: |
| 162 | + return -1.0 * expm1(-1.0 * lambdaTime) / lambda; |
| 163 | + } else { |
| 164 | + // Approximate exp(-1.0 * lambdaTime) = 1.0 - lambdaTime + 0.5 * lambdaTime * lambdaTime here (also works for lambda = 0): |
| 165 | + return time * (1.0 - 0.5 * lambdaTime); |
| 166 | + } |
| 167 | + } |
| 168 | +} |
0 commit comments