|  | 
|  | 1 | +/* | 
|  | 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | 
|  | 3 | + * or more contributor license agreements. Licensed under the "Elastic License | 
|  | 4 | + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | 
|  | 5 | + * Public License v 1"; you may not use this file except in compliance with, at | 
|  | 6 | + * your election, the "Elastic License 2.0", the "GNU Affero General Public | 
|  | 7 | + * License v3.0 only", or the "Server Side Public License, v 1". | 
|  | 8 | + */ | 
|  | 9 | + | 
|  | 10 | +package org.elasticsearch.common.metrics; | 
|  | 11 | + | 
|  | 12 | +import static java.lang.Math.exp; | 
|  | 13 | +import static java.lang.Math.expm1; | 
|  | 14 | + | 
|  | 15 | +/** | 
|  | 16 | + * Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to | 
|  | 17 | + * some sort of gauge or counter which gives a value for the rate at which the gauge is being incremented where the weight given to an | 
|  | 18 | + * increment decreases exponentially with how long ago it happened. | 
|  | 19 | + * | 
|  | 20 | + * <p>Definitions: The <i>rate</i> of increase of the gauge or counter over an interval is the sum of the increments which occurred during | 
|  | 21 | + * the interval, divided by the length of the interval. The <i>weighted</i> rate of increase is the sum of the increments with each | 
|  | 22 | + * multiplied by a weight which is a function of how long ago it happened, divided by the integral of the weight function over the interval. | 
|  | 23 | + * The <i>exponentially</i> weighted rate is the weighted rate when the weight function is given by {@code exp(-1.0 * lambda * time)} where | 
|  | 24 | + * {@code lambda} is a constant and {@code time} specifies how long ago the increment happened. A <i>moving</i> rate is simply a rate | 
|  | 25 | + * calculated for an every-growing series of increments (typically by updating the previous rate rather than recalculating from scratch). | 
|  | 26 | + * | 
|  | 27 | + * <p>The times involved in the API of this class can use the caller's choice of units and origin, e.g. they can be millis since the | 
|  | 28 | + * epoch as returned by {@link System#currentTimeMillis}, nanos since an arbitrary origin as returned by {@link System#nanoTime}, or | 
|  | 29 | + * anything else. The only requirement is that the same convention must be used consistently. | 
|  | 30 | + * | 
|  | 31 | + * <p>This class is thread-safe. | 
|  | 32 | + */ | 
|  | 33 | +public class ExponentiallyWeightedMovingRate { | 
|  | 34 | + | 
|  | 35 | +    // The maths behind this is explained in section 2 of this document: https://github.com/user-attachments/files/19166625/ewma.pdf | 
|  | 36 | + | 
|  | 37 | +    // This implementation uses synchronization to provide thread safety. The synchronized code is all non-blocking, and just performs a | 
|  | 38 | +    // fixed small number of floating point operations plus some memory reads and writes. If they take somewhere in the region of 10ns each, | 
|  | 39 | +    // we can do up to tens of millions of QPS before the lock risks becoming a bottleneck. | 
|  | 40 | + | 
|  | 41 | +    private final double lambda; | 
|  | 42 | +    private final long startTime; | 
|  | 43 | +    private double rate; | 
|  | 44 | +    long lastTime; | 
|  | 45 | + | 
|  | 46 | +    /** | 
|  | 47 | +     * Constructor. | 
|  | 48 | +     * | 
|  | 49 | +     * @param lambda A parameter which dictates how quickly the average "forgets" older increments. The weight given to an increment which | 
|  | 50 | +     *     happened time {@code timeAgo} ago will be proportional to {@code exp(-1.0 * lambda * timeAgo)}. The half-life is related to this | 
|  | 51 | +     *     parameter by the equation {@code exp(-1.0 * lambda * halfLife)} = 0.5}, so {@code lambda = log(2.0) / halfLife)}. This may be | 
|  | 52 | +     *     zero, but must not be negative. The units of this value are the inverse of the units being used for time. | 
|  | 53 | +     * @param startTime The time to consider the start time for the rate calculation. This must be greater than zero. The units and origin | 
|  | 54 | +     *     of this value must match all other calls to this instance. | 
|  | 55 | +     */ | 
|  | 56 | +    public ExponentiallyWeightedMovingRate(double lambda, long startTime) { | 
|  | 57 | +        if (lambda < 0.0) { | 
|  | 58 | +            throw new IllegalArgumentException("lambda must be non-negative but was " + lambda); | 
|  | 59 | +        } | 
|  | 60 | +        if (startTime <= 0.0) { | 
|  | 61 | +            throw new IllegalArgumentException("startTime must be non-negative but was " + startTime); | 
|  | 62 | +        } | 
|  | 63 | +        synchronized (this) { | 
|  | 64 | +            this.lambda = lambda; | 
|  | 65 | +            this.rate = Double.NaN; // should never be used | 
|  | 66 | +            this.startTime = startTime; | 
|  | 67 | +            this.lastTime = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first | 
|  | 68 | +        } | 
|  | 69 | +    } | 
|  | 70 | + | 
|  | 71 | +    /** | 
|  | 72 | +     * Returns the EWMR at the given time. The units and origin of this time must match all other calls to this instance. The units | 
|  | 73 | +     * of the returned rate are the ratio of the increment units to the time units as used for {@link #addIncrement}. | 
|  | 74 | +     * | 
|  | 75 | +     * <p>If there have been no increments yet, this returns zero. | 
|  | 76 | +     * | 
|  | 77 | +     * <p>Otherwise, we require the time to be no earlier than the time of the previous increment, i.e. the value of {@code time} | 
|  | 78 | +     * for this call must not be less than the value of {@code time} for the last call to {@link #addIncrement}. If this is not the | 
|  | 79 | +     * case, the method behaves as if it had that minimum value. | 
|  | 80 | +     */ | 
|  | 81 | +    public double getRate(long time) { | 
|  | 82 | +        synchronized (this) { | 
|  | 83 | +            if (lastTime == 0) { // indicates that no increment has happened yet | 
|  | 84 | +                return 0.0; | 
|  | 85 | +            } else if (time <= lastTime) { | 
|  | 86 | +                return rate; | 
|  | 87 | +            } else { | 
|  | 88 | +                // This is the formula for R(t) given in subsection 2.6 of the document referenced above: | 
|  | 89 | +                return expHelper(lastTime - startTime) * exp(-1.0 * lambda * (time - lastTime)) * rate / expHelper(time - startTime); | 
|  | 90 | +            } | 
|  | 91 | +        } | 
|  | 92 | +    } | 
|  | 93 | + | 
|  | 94 | +    /** | 
|  | 95 | +     * Given the EWMR {@code currentRate} at time {@code currentTime} and the EWMR {@code oldRate} at time {@code oldTime}, returns the EWMR | 
|  | 96 | +     * that would be calculated at {@code currentTime} if the start time was {@code oldTime} rather than the {@code startTime} passed to the | 
|  | 97 | +     * parameter. This rate incorporates all the increments that contributed to {@code currentRate} but not to {@code oldRate}. The | 
|  | 98 | +     * increments that contributed to {@code oldRate} are effectively 'forgotten'. The units and origin of the times and rates must match | 
|  | 99 | +     * all other calls to this instance. | 
|  | 100 | +     * | 
|  | 101 | +     * <p>Normally, {@code currentTime} should be after {@code oldTime}. If it is not, this method returns zero. | 
|  | 102 | +     * | 
|  | 103 | +     * <p>Note that this method does <i>not</i> depend on any of the increments made to this {@link ExponentiallyWeightedMovingRate} | 
|  | 104 | +     * instance. It is only non-static because it uses this instance's {@code lambda} and {@code startTime}. | 
|  | 105 | +     */ | 
|  | 106 | +    public double calculateRateSince(long currentTime, double currentRate, long oldTime, double oldRate) { | 
|  | 107 | +        if (currentTime <= oldTime) { | 
|  | 108 | +            return 0.0; | 
|  | 109 | +        } | 
|  | 110 | +        // This is the formula for R'(t, T) given in subsection 2.7 of the document referenced above: | 
|  | 111 | +        return (expHelper(currentTime - startTime) * currentRate - expHelper(oldTime - startTime) * exp( | 
|  | 112 | +            -1.0 * lambda * (currentTime - oldTime) | 
|  | 113 | +        ) * oldRate) / expHelper(currentTime - oldTime); | 
|  | 114 | +    } | 
|  | 115 | + | 
|  | 116 | +    /** | 
|  | 117 | +     * Updates the rate to reflect that the gauge has been incremented by an amount {@code increment} at a time {@code time}. The unit and | 
|  | 118 | +     * offset of the time must match all other calls to this instance. The units of the increment are arbitrary but must also be consistent. | 
|  | 119 | +     * | 
|  | 120 | +     * <p>If this is the first increment, we require it to occur after the start time for the rate calculation, i.e. the value of | 
|  | 121 | +     * {@code time} must be greater than {@code startTime} passed to the constructor. If this is not the case, the method behaves as if | 
|  | 122 | +     * {@code time} is {@code startTime + 1} to prevent a division by zero error. | 
|  | 123 | +     * | 
|  | 124 | +     * <p>If this is not the first increment, we require it not to occur before the previous increment, i.e. the value of {@code time} for | 
|  | 125 | +     * this call must be greater than or equal to the value for the previous call. If this is not the case, the method behaves as if this | 
|  | 126 | +     * call's {@code time} is the same as the previous call's. | 
|  | 127 | +     */ | 
|  | 128 | +    public void addIncrement(double increment, long time) { | 
|  | 129 | +        synchronized (this) { | 
|  | 130 | +            if (lastTime == 0) { // indicates that this is the first increment | 
|  | 131 | +                if (time <= startTime) { | 
|  | 132 | +                    time = startTime + 1; | 
|  | 133 | +                } | 
|  | 134 | +                // This is the formula for R(t_1) given in subsection 2.6 of the document referenced above: | 
|  | 135 | +                rate = increment / expHelper(time - startTime); | 
|  | 136 | +            } else { | 
|  | 137 | +                if (time < lastTime) { | 
|  | 138 | +                    time = lastTime; | 
|  | 139 | +                } | 
|  | 140 | +                // This is the formula for R(t_j+1) given in subsection 2.6 of the document referenced above: | 
|  | 141 | +                rate += (increment - expHelper(time - lastTime) * rate) / expHelper(time - startTime); | 
|  | 142 | +            } | 
|  | 143 | +            lastTime = time; | 
|  | 144 | +        } | 
|  | 145 | +    } | 
|  | 146 | + | 
|  | 147 | +    /** | 
|  | 148 | +     * Returns something mathematically equivalent to {@code (1.0 - exp(-1.0 * lambda * time)) / lambda}, using an implementation which | 
|  | 149 | +     * should not be subject to numerical instability when {@code lambda * time} is small. Returns {@code time} when {@code lambda = 0}, | 
|  | 150 | +     * which is the correct limit. | 
|  | 151 | +     */ | 
|  | 152 | +    private double expHelper(double time) { | 
|  | 153 | +        // This is the function E(lambda, t) defined in subsection 2.6 of the document referenced above, and the calculation follows the | 
|  | 154 | +        // principles discussed there: | 
|  | 155 | +        assert time >= 0.0; | 
|  | 156 | +        double lambdaTime = lambda * time; | 
|  | 157 | +        if (lambdaTime >= 1.0e-2) { | 
|  | 158 | +            // The direct calculation should be fine here: | 
|  | 159 | +            return (1.0 - exp(-1.0 * lambdaTime)) / lambda; | 
|  | 160 | +        } else if (lambdaTime >= 1.0e-10) { | 
|  | 161 | +            // Avoid taking the small difference of two similar quantities by using expm1 here: | 
|  | 162 | +            return -1.0 * expm1(-1.0 * lambdaTime) / lambda; | 
|  | 163 | +        } else { | 
|  | 164 | +            // Approximate exp(-1.0 * lambdaTime) = 1.0 - lambdaTime + 0.5 * lambdaTime * lambdaTime here (also works for lambda = 0): | 
|  | 165 | +            return time * (1.0 - 0.5 * lambdaTime); | 
|  | 166 | +        } | 
|  | 167 | +    } | 
|  | 168 | +} | 
0 commit comments