Skip to content

Commit 054308b

Browse files
authored
[8.x] Backport 119578 + 114289 (#119638) (#119645)
* Refactor change point detection (#114289) * Move change detection code to separate class * Uniformize ChangeDetector and SkipeAndDipDetector * Separate ChangeDetectorTests and ChangePointAggregatorTests. * Public entrypoint for change point detection * Move p-value computation to ChangeDetector * Move main entrypoint to a separate file * Fix change point detection for uncertain non-stationary distributions. (#119578) * Fix change point detection for uncertain non-stationary distributions. * Replace -1 by ChangeType.NO_CHANGE_POINT
1 parent c8b5b9a commit 054308b

File tree

8 files changed

+898
-795
lines changed

8 files changed

+898
-795
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java

Lines changed: 562 additions & 0 deletions
Large diffs are not rendered by default.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

Lines changed: 2 additions & 588 deletions
Large diffs are not rendered by default.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.aggs.changepoint;
9+
10+
import org.apache.commons.math3.exception.NotStrictlyPositiveException;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
14+
15+
/**
16+
* Detects whether a series of values has a change point, by running both
17+
* ChangeDetector and SpikeAndDipDetector on it. This is the main entrypoint
18+
* of change point detection.
19+
*/
20+
public class ChangePointDetector {
21+
22+
private static final Logger logger = LogManager.getLogger(ChangePointDetector.class);
23+
24+
static final double P_VALUE_THRESHOLD = 0.01;
25+
static final int MINIMUM_BUCKETS = 10;
26+
27+
/**
28+
* Returns the ChangeType of a series of values.
29+
*/
30+
public static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) {
31+
if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) {
32+
return new ChangeType.Indeterminable(
33+
"not enough buckets to calculate change_point. Requires at least ["
34+
+ ((2 * MINIMUM_BUCKETS) + 2)
35+
+ "]; found ["
36+
+ bucketValues.getValues().length
37+
+ "]"
38+
);
39+
}
40+
41+
ChangeType spikeOrDip;
42+
try {
43+
SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues);
44+
spikeOrDip = detect.detect(P_VALUE_THRESHOLD);
45+
logger.trace("spike or dip p-value: [{}]", spikeOrDip.pValue());
46+
} catch (NotStrictlyPositiveException nspe) {
47+
logger.debug("failure testing for dips and spikes", nspe);
48+
spikeOrDip = new ChangeType.Indeterminable("failure testing for dips and spikes");
49+
}
50+
51+
ChangeType change = new ChangeDetector(bucketValues).detect(P_VALUE_THRESHOLD);
52+
logger.trace("change p-value: [{}]", change.pValue());
53+
54+
if (spikeOrDip.pValue() < change.pValue()) {
55+
change = spikeOrDip;
56+
}
57+
return change;
58+
}
59+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
*/
2222
public interface ChangeType extends NamedWriteable, NamedXContentObject {
2323

24+
int NO_CHANGE_POINT = -1;
25+
2426
default int changePoint() {
25-
return -1;
27+
return NO_CHANGE_POINT;
2628
}
2729

2830
default double pValue() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
9292
return newValues;
9393
}
9494

95+
private final MlAggsHelper.DoubleBucketValues bucketValues;
9596
private final int numValues;
9697
private final int dipIndex;
9798
private final int spikeIndex;
@@ -100,7 +101,9 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
100101
private final KDE spikeTestKDE;
101102
private final KDE dipTestKDE;
102103

103-
SpikeAndDipDetector(double[] values) {
104+
SpikeAndDipDetector(MlAggsHelper.DoubleBucketValues bucketValues) {
105+
this.bucketValues = bucketValues;
106+
double[] values = bucketValues.getValues();
104107

105108
numValues = values.length;
106109

@@ -135,7 +138,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
135138
spikeTestKDE = new KDE(spikeKDEValues, 1.36);
136139
}
137140

138-
ChangeType at(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
141+
ChangeType detect(double pValueThreshold) {
139142
if (dipIndex == -1 || spikeIndex == -1) {
140143
return new ChangeType.Indeterminable(
141144
"not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]"
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.aggs.changepoint;
9+
10+
import org.apache.commons.math3.distribution.GammaDistribution;
11+
import org.apache.commons.math3.distribution.NormalDistribution;
12+
import org.apache.commons.math3.random.RandomGeneratorFactory;
13+
import org.elasticsearch.common.Randomness;
14+
import org.elasticsearch.search.aggregations.AggregatorTestCase;
15+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
16+
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
import java.util.stream.DoubleStream;
19+
20+
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.greaterThan;
22+
import static org.hamcrest.Matchers.instanceOf;
23+
import static org.hamcrest.Matchers.lessThan;
24+
25+
public class ChangeDetectorTests extends AggregatorTestCase {
26+
27+
public void testStationaryFalsePositiveRate() {
28+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
29+
int fp = 0;
30+
for (int i = 0; i < 100; i++) {
31+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
32+
null,
33+
DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray()
34+
);
35+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
36+
fp += type instanceof ChangeType.Stationary ? 0 : 1;
37+
}
38+
assertThat(fp, lessThan(10));
39+
40+
fp = 0;
41+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
42+
for (int i = 0; i < 100; i++) {
43+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
44+
null,
45+
DoubleStream.generate(gamma::sample).limit(40).toArray()
46+
);
47+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
48+
fp += type instanceof ChangeType.Stationary ? 0 : 1;
49+
}
50+
assertThat(fp, lessThan(10));
51+
}
52+
53+
public void testSampledDistributionTestFalsePositiveRate() {
54+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0);
55+
int fp = 0;
56+
for (int i = 0; i < 100; i++) {
57+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
58+
null,
59+
DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray()
60+
);
61+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
62+
fp += type instanceof ChangeType.Stationary ? 0 : 1;
63+
}
64+
assertThat(fp, lessThan(10));
65+
}
66+
67+
public void testNonStationaryFalsePositiveRate() {
68+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
69+
int fp = 0;
70+
for (int i = 0; i < 100; i++) {
71+
AtomicInteger j = new AtomicInteger();
72+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
73+
null,
74+
DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray()
75+
);
76+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
77+
fp += type instanceof ChangeType.NonStationary ? 0 : 1;
78+
}
79+
assertThat(fp, lessThan(10));
80+
81+
fp = 0;
82+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
83+
for (int i = 0; i < 100; i++) {
84+
AtomicInteger j = new AtomicInteger();
85+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
86+
null,
87+
DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray()
88+
);
89+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
90+
fp += type instanceof ChangeType.NonStationary ? 0 : 1;
91+
}
92+
assertThat(fp, lessThan(10));
93+
}
94+
95+
public void testStepChangePower() {
96+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
97+
int tp = 0;
98+
for (int i = 0; i < 100; i++) {
99+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
100+
null,
101+
DoubleStream.concat(
102+
DoubleStream.generate(() -> normal.sample()).limit(20),
103+
DoubleStream.generate(() -> 10 + normal.sample()).limit(20)
104+
).toArray()
105+
);
106+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
107+
tp += type instanceof ChangeType.StepChange ? 1 : 0;
108+
}
109+
assertThat(tp, greaterThan(80));
110+
111+
tp = 0;
112+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
113+
for (int i = 0; i < 100; i++) {
114+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
115+
null,
116+
DoubleStream.concat(
117+
DoubleStream.generate(() -> gamma.sample()).limit(20),
118+
DoubleStream.generate(() -> 10 + gamma.sample()).limit(20)
119+
).toArray()
120+
);
121+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
122+
tp += type instanceof ChangeType.StepChange ? 1 : 0;
123+
}
124+
assertThat(tp, greaterThan(80));
125+
}
126+
127+
public void testTrendChangePower() {
128+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
129+
int tp = 0;
130+
for (int i = 0; i < 100; i++) {
131+
AtomicInteger j = new AtomicInteger();
132+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
133+
null,
134+
DoubleStream.concat(
135+
DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20),
136+
DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20)
137+
).toArray()
138+
);
139+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
140+
tp += type instanceof ChangeType.TrendChange ? 1 : 0;
141+
}
142+
assertThat(tp, greaterThan(80));
143+
144+
tp = 0;
145+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
146+
for (int i = 0; i < 100; i++) {
147+
AtomicInteger j = new AtomicInteger();
148+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
149+
null,
150+
DoubleStream.concat(
151+
DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20),
152+
DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20)
153+
).toArray()
154+
);
155+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
156+
tp += type instanceof ChangeType.TrendChange ? 1 : 0;
157+
}
158+
assertThat(tp, greaterThan(80));
159+
}
160+
161+
public void testDistributionChangeTestPower() {
162+
NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0);
163+
NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0);
164+
int tp = 0;
165+
for (int i = 0; i < 100; i++) {
166+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
167+
null,
168+
DoubleStream.concat(
169+
DoubleStream.generate(() -> 10 + normal1.sample()).limit(50),
170+
DoubleStream.generate(() -> 10 + normal2.sample()).limit(50)
171+
).toArray()
172+
);
173+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
174+
tp += type instanceof ChangeType.DistributionChange ? 1 : 0;
175+
}
176+
assertThat(tp, greaterThan(90));
177+
}
178+
179+
public void testMultipleChanges() {
180+
NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0);
181+
NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0);
182+
NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3);
183+
int tp = 0;
184+
for (int i = 0; i < 100; i++) {
185+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
186+
null,
187+
DoubleStream.concat(
188+
DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)),
189+
DoubleStream.generate(normal3::sample).limit(23)
190+
).toArray()
191+
);
192+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
193+
tp += type instanceof ChangeType.TrendChange ? 1 : 0;
194+
}
195+
assertThat(tp, greaterThan(90));
196+
}
197+
198+
public void testProblemDistributionChange() {
199+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
200+
null,
201+
new double[] {
202+
546.3651753325270,
203+
550.872738079514,
204+
551.1312487618040,
205+
550.3323904749380,
206+
549.2652495378930,
207+
548.9761274963630,
208+
549.3433969743010,
209+
549.0935313531350,
210+
551.1762550747600,
211+
551.3772184469220,
212+
548.6163495094490,
213+
548.5866591594080,
214+
546.9364791288570,
215+
548.1167839989470,
216+
549.3484016149320,
217+
550.4242803917040,
218+
551.2316023050940,
219+
548.4713993534340,
220+
546.0254901960780,
221+
548.4376996805110,
222+
561.1920529801320,
223+
557.3930041152260,
224+
565.8497217068650,
225+
566.787072243346,
226+
546.6094890510950,
227+
530.5905797101450,
228+
556.7340823970040,
229+
557.3857677902620,
230+
543.0754716981130,
231+
574.3297101449280,
232+
559.2962962962960,
233+
549.5202952029520,
234+
531.7217741935480,
235+
551.4333333333330,
236+
557.637168141593,
237+
545.1880733944950,
238+
564.6893203883500,
239+
543.0204081632650,
240+
571.820809248555,
241+
541.2589928057550,
242+
520.4387755102040 }
243+
);
244+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
245+
assertThat(type, instanceOf(ChangeType.DistributionChange.class));
246+
}
247+
248+
public void testUncertainNonStationary() {
249+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
250+
null,
251+
new double[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700, 735, 715 }
252+
);
253+
ChangeType type = new ChangeDetector(bucketValues).detect(0.01);
254+
assertThat(type, instanceOf(ChangeType.NonStationary.class));
255+
assertThat(((ChangeType.NonStationary) type).getTrend(), equalTo("increasing"));
256+
}
257+
}

0 commit comments

Comments
 (0)