Skip to content

Commit 3df375a

Browse files
Implement an exponentially weighted moving rate
This is intended to be used to efficiently calculate a write load metric for use by the auto-sharding algorithm which favours more recent loads.
1 parent a2b0d96 commit 3df375a

File tree

2 files changed

+511
-0
lines changed

2 files changed

+511
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.metrics;
11+
12+
import static java.lang.Math.exp;
13+
import static java.lang.Math.expm1;
14+
15+
/**
16+
* Implements a version of an exponentially weighted moving rate (EWMR). This is a calculation over a finite time series of increments to
17+
* some sort of gauge or counter which gives a value for the rate at which the gauge is being incremented where the weight given to an
18+
* increment decreases exponentially with how long ago it happened.
19+
*
20+
* <p>Definitions: The <i>rate</i> of increase of the gauge or counter over an interval is the sum of the increments which occurred during
21+
* the interval, divided by the length of the interval. The <i>weighted</i> rate of increase is the sum of the increments with each
22+
* multiplied by a weight which is a function of how long ago it happened, divided by the integral of the weight function over the interval.
23+
* The <i>exponentially</i> weighted rate is the weighted rate when the weight function is given by {@code exp(-1.0 * lambda * time)} where
24+
* {@code lambda} is a constant and {@code time} specifies how long ago the increment happened. A <i>moving</i> rate is simply a rate
25+
* calculated for an every-growing series of increments (typically by updating the previous rate rather than recalculating from scratch).
26+
*
27+
* <p>This class is thread-safe.
28+
*/
29+
public class ExponentiallyWeightedMovingRate {
30+
31+
// This implementation uses synchronization to provide thread safety. The synchronized code is all non-blocking, and just performs a
32+
// fixed small number of floating point operations plus some memory reads and writes. If they take somewhere in the region of 10ns each,
33+
// we can do up to tens of millions of QPS before the lock risks becoming a bottleneck.
34+
35+
private final double lambda;
36+
private final long startTimeInMillis;
37+
private double rate;
38+
long lastTimeInMillis;
39+
40+
/**
41+
* Constructor.
42+
*
43+
* @param lambda A parameter which dictates how quickly the average "forgets" older increments. The weight given to an increment which
44+
* happened a time {@code timeAgo} milliseconds ago will be proportional to {@code exp(-1.0 * lambda * timeAgo)}. The half-life
45+
* (measured in milliseconds) is related to this parameter by the equation {@code exp(-1.0 * lambda * halfLife)} = 0.5}, so
46+
* {@code lambda = log(2.0) / halfLife)}. This may be zero, but must not be negative.
47+
* @param startTimeInMillis The time, in milliseconds since the epoch, to consider the start time for the rate calculation. This must be
48+
* greater than zero.
49+
*/
50+
public ExponentiallyWeightedMovingRate(double lambda, long startTimeInMillis) {
51+
if (lambda < 0.0) {
52+
throw new IllegalArgumentException("lambda must be non-negative but was " + lambda);
53+
}
54+
if (startTimeInMillis <= 0.0) {
55+
throw new IllegalArgumentException("lambda must be non-negative but was " + startTimeInMillis);
56+
}
57+
synchronized (this) {
58+
this.lambda = lambda;
59+
this.rate = Double.NaN; // should never be used
60+
this.startTimeInMillis = startTimeInMillis;
61+
this.lastTimeInMillis = 0; // after an increment, this must be positive, so a zero value indicates we're waiting for the first
62+
}
63+
}
64+
65+
/**
66+
* Returns the EWMR at the given time, in millis since the epoch.
67+
*
68+
* <p>If there have been no increments yet, this returns zero.
69+
*
70+
* <p>Otherwise, we require the time to be no earlier than the time of the previous increment, i.e. the value of {@code timeInMillis}
71+
* for this call must not be less than the value of {@code timeInMillis} for the last call to {@link #addIncrement}. If this is not the
72+
* case, the method behaves as if it had that minimum value.
73+
*/
74+
public double getRate(long timeInMillis) {
75+
synchronized (this) {
76+
if (lastTimeInMillis == 0) { // indicates that no increment has happened yet
77+
return 0.0;
78+
} else if (timeInMillis <= lastTimeInMillis) {
79+
return rate;
80+
} else {
81+
return expHelper(lastTimeInMillis - startTimeInMillis) * exp(-1.0 * lambda * (timeInMillis - lastTimeInMillis)) * rate
82+
/ expHelper(timeInMillis - startTimeInMillis);
83+
}
84+
}
85+
}
86+
87+
/**
88+
* Given the EWMR {@code currentRate} at time {@code currentTimeMillis} and the EWMR {@code oldRate} at time {@code oldTimeMillis},
89+
* returns the EWMR that would be calculated at {@code currentTimeMillis} if the start time was {@code oldTimeMillis} rather than the
90+
* {@code startTimeMillis} passed to the parameter. This rate incorporates all the increments that contributed to {@code currentRate}
91+
* but not to {@code oldRate}. The increments that contributed to {@code oldRate} are effectively 'forgotten'. All times are in millis
92+
* since the epoch.
93+
*
94+
* <p>Normally, {@code currentTimeMillis} should be after {@code oldTimeMillis}. If it is not, this method returns zero.
95+
*
96+
* <p>Note that this method does <i>not</i> depend on any of the increments made to this {@link ExponentiallyWeightedMovingRate}
97+
* instance, or on its {@code startTimeMillis}. It is only non-static because it uses this instance's {@code lambda}.
98+
*/
99+
public double calculateRateSince(long currentTimeMillis, double currentRate, long oldTimeMillis, double oldRate) {
100+
if (currentTimeMillis <= oldTimeMillis) {
101+
return 0.0;
102+
}
103+
return (expHelper(currentTimeMillis - startTimeInMillis) * currentRate - expHelper(oldTimeMillis - startTimeInMillis) * exp(
104+
-1.0 * lambda * (currentTimeMillis - oldTimeMillis)
105+
) * oldRate) / expHelper(currentTimeMillis - oldTimeMillis);
106+
}
107+
108+
/**
109+
* Updates the rate to reflect that the gauge has been incremented by an amount {@code increment} at a time {@code timeInMillis} in
110+
* milliseconds since the epoch.
111+
*
112+
* <p>If this is the first increment, we require it to occur after the start time for the rate calculation, i.e. the value of
113+
* {@code timeInMillis} must be greater than {@code startTimeInMillis} passed to the constructor. If this is not the case, the method
114+
* behaves as if {@code timeInMillis} is {@code startTimeInMillis + 1} to prevent a division by zero error.
115+
*
116+
* <p>If this is not the first increment, we require it not to occur before the previous increment, i.e. the value of
117+
* {@code timeInMillis} for this call must be greater than or equal to the value for the previous call. If this is not the case, the
118+
* method behaves as if this call's {@code timeInMillis} is the same as the previous call's.
119+
*/
120+
public void addIncrement(double increment, long timeInMillis) {
121+
synchronized (this) {
122+
if (lastTimeInMillis == 0) { // indicates that this is the first increment
123+
if (timeInMillis <= startTimeInMillis) {
124+
timeInMillis = startTimeInMillis + 1;
125+
}
126+
rate = increment / expHelper(timeInMillis - startTimeInMillis);
127+
} else {
128+
if (timeInMillis < lastTimeInMillis) {
129+
timeInMillis = lastTimeInMillis;
130+
}
131+
rate += (increment - expHelper(timeInMillis - lastTimeInMillis) * rate) / expHelper(timeInMillis - startTimeInMillis);
132+
}
133+
lastTimeInMillis = timeInMillis;
134+
}
135+
}
136+
137+
/**
138+
* Returns something mathematically equivalent to {@code (1.0 - exp(-1.0 * lambda * time)) / lambda}, using an implementation which
139+
* should not be subject to numerical instability when {@code lambda * time} is small. Returns {@code time} when {@code lambda = 0},
140+
* which is the correct limit.
141+
*/
142+
private double expHelper(double time) {
143+
assert time >= 0.0;
144+
double lambdaTime = lambda * time;
145+
if (lambdaTime >= 1.0e-2) {
146+
// The direct calculation should be fine here:
147+
return (1.0 - exp(-1.0 * lambdaTime)) / lambda;
148+
} else if (lambdaTime >= 1.0e-10) {
149+
// Avoid taking the small difference of two similar quantities by using expm1 here:
150+
return -1.0 * expm1(-1.0 * lambdaTime) / lambda;
151+
} else {
152+
// Approximate exp(-1.0 * lambdaTime) = 1.0 - lambdaTime + 0.5 * lambdaTime * lambdaTime here (also works for lambda = 0):
153+
return time * (1.0 - 0.5 * lambdaTime);
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)