Skip to content

Commit 3152e41

Browse files
authored
Refactor change point detection (#114289)
* Move change detection code to separate class * Uniformize ChangeDetector and SkipeAndDipDetector * Separate ChangeDetectorTests and ChangePointAggregatorTests. * Public entrypoint for change point detection * Move p-value computation to ChangeDetector * Move main entrypoint to a separate file
1 parent c8c6f5a commit 3152e41

File tree

7 files changed

+881
-793
lines changed

7 files changed

+881
-793
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java

Lines changed: 560 additions & 0 deletions
Large diffs are not rendered by default.

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

Lines changed: 1 addition & 587 deletions
Large diffs are not rendered by default.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.aggs.changepoint;
9+
10+
import org.apache.commons.math3.exception.NotStrictlyPositiveException;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
14+
15+
/**
16+
* Detects whether a series of values has a change point, by running both
17+
* ChangeDetector and SpikeAndDipDetector on it. This is the main entrypoint
18+
* of change point detection.
19+
*/
20+
public class ChangePointDetector {
21+
22+
private static final Logger logger = LogManager.getLogger(ChangePointDetector.class);
23+
24+
static final double P_VALUE_THRESHOLD = 0.01;
25+
static final int MINIMUM_BUCKETS = 10;
26+
27+
/**
28+
* Returns the ChangeType of a series of values.
29+
*/
30+
public static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) {
31+
if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) {
32+
return new ChangeType.Indeterminable(
33+
"not enough buckets to calculate change_point. Requires at least ["
34+
+ ((2 * MINIMUM_BUCKETS) + 2)
35+
+ "]; found ["
36+
+ bucketValues.getValues().length
37+
+ "]"
38+
);
39+
}
40+
41+
ChangeType spikeOrDip;
42+
try {
43+
SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues);
44+
spikeOrDip = detect.detect(P_VALUE_THRESHOLD);
45+
logger.trace("spike or dip p-value: [{}]", spikeOrDip.pValue());
46+
} catch (NotStrictlyPositiveException nspe) {
47+
logger.debug("failure testing for dips and spikes", nspe);
48+
spikeOrDip = new ChangeType.Indeterminable("failure testing for dips and spikes");
49+
}
50+
51+
ChangeType change = new ChangeDetector(bucketValues).detect(P_VALUE_THRESHOLD);
52+
logger.trace("change p-value: [{}]", change.pValue());
53+
54+
if (spikeOrDip.pValue() < change.pValue()) {
55+
change = spikeOrDip;
56+
}
57+
return change;
58+
}
59+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
9292
return newValues;
9393
}
9494

95+
private final MlAggsHelper.DoubleBucketValues bucketValues;
9596
private final int numValues;
9697
private final int dipIndex;
9798
private final int spikeIndex;
@@ -100,7 +101,9 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
100101
private final KDE spikeTestKDE;
101102
private final KDE dipTestKDE;
102103

103-
SpikeAndDipDetector(double[] values) {
104+
SpikeAndDipDetector(MlAggsHelper.DoubleBucketValues bucketValues) {
105+
this.bucketValues = bucketValues;
106+
double[] values = bucketValues.getValues();
104107

105108
numValues = values.length;
106109

@@ -135,7 +138,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
135138
spikeTestKDE = new KDE(spikeKDEValues, 1.36);
136139
}
137140

138-
ChangeType at(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
141+
ChangeType detect(double pValueThreshold) {
139142
if (dipIndex == -1 || spikeIndex == -1) {
140143
return new ChangeType.Indeterminable(
141144
"not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]"
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.aggs.changepoint;
9+
10+
import org.apache.commons.math3.distribution.GammaDistribution;
11+
import org.apache.commons.math3.distribution.NormalDistribution;
12+
import org.apache.commons.math3.random.RandomGeneratorFactory;
13+
import org.elasticsearch.common.Randomness;
14+
import org.elasticsearch.search.aggregations.AggregatorTestCase;
15+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
16+
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
import java.util.stream.DoubleStream;
19+
20+
import static org.hamcrest.Matchers.greaterThan;
21+
import static org.hamcrest.Matchers.instanceOf;
22+
import static org.hamcrest.Matchers.lessThan;
23+
24+
public class ChangeDetectorTests extends AggregatorTestCase {
25+
26+
public void testStationaryFalsePositiveRate() {
27+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
28+
int fp = 0;
29+
for (int i = 0; i < 100; i++) {
30+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
31+
null,
32+
DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray()
33+
);
34+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
35+
fp += type instanceof ChangeType.Stationary ? 0 : 1;
36+
}
37+
assertThat(fp, lessThan(10));
38+
39+
fp = 0;
40+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
41+
for (int i = 0; i < 100; i++) {
42+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
43+
null,
44+
DoubleStream.generate(gamma::sample).limit(40).toArray()
45+
);
46+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
47+
fp += type instanceof ChangeType.Stationary ? 0 : 1;
48+
}
49+
assertThat(fp, lessThan(10));
50+
}
51+
52+
public void testSampledDistributionTestFalsePositiveRate() {
53+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0);
54+
int fp = 0;
55+
for (int i = 0; i < 100; i++) {
56+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
57+
null,
58+
DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray()
59+
);
60+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
61+
fp += type instanceof ChangeType.Stationary ? 0 : 1;
62+
}
63+
assertThat(fp, lessThan(10));
64+
}
65+
66+
public void testNonStationaryFalsePositiveRate() {
67+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
68+
int fp = 0;
69+
for (int i = 0; i < 100; i++) {
70+
AtomicInteger j = new AtomicInteger();
71+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
72+
null,
73+
DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray()
74+
);
75+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
76+
fp += type instanceof ChangeType.NonStationary ? 0 : 1;
77+
}
78+
assertThat(fp, lessThan(10));
79+
80+
fp = 0;
81+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
82+
for (int i = 0; i < 100; i++) {
83+
AtomicInteger j = new AtomicInteger();
84+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
85+
null,
86+
DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray()
87+
);
88+
ChangeType type = new ChangeDetector(bucketValues).detect(1e-4);
89+
fp += type instanceof ChangeType.NonStationary ? 0 : 1;
90+
}
91+
assertThat(fp, lessThan(10));
92+
}
93+
94+
public void testStepChangePower() {
95+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
96+
int tp = 0;
97+
for (int i = 0; i < 100; i++) {
98+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
99+
null,
100+
DoubleStream.concat(
101+
DoubleStream.generate(() -> normal.sample()).limit(20),
102+
DoubleStream.generate(() -> 10 + normal.sample()).limit(20)
103+
).toArray()
104+
);
105+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
106+
tp += type instanceof ChangeType.StepChange ? 1 : 0;
107+
}
108+
assertThat(tp, greaterThan(80));
109+
110+
tp = 0;
111+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
112+
for (int i = 0; i < 100; i++) {
113+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
114+
null,
115+
DoubleStream.concat(
116+
DoubleStream.generate(() -> gamma.sample()).limit(20),
117+
DoubleStream.generate(() -> 10 + gamma.sample()).limit(20)
118+
).toArray()
119+
);
120+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
121+
tp += type instanceof ChangeType.StepChange ? 1 : 0;
122+
}
123+
assertThat(tp, greaterThan(80));
124+
}
125+
126+
public void testTrendChangePower() {
127+
NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2);
128+
int tp = 0;
129+
for (int i = 0; i < 100; i++) {
130+
AtomicInteger j = new AtomicInteger();
131+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
132+
null,
133+
DoubleStream.concat(
134+
DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20),
135+
DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20)
136+
).toArray()
137+
);
138+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
139+
tp += type instanceof ChangeType.TrendChange ? 1 : 0;
140+
}
141+
assertThat(tp, greaterThan(80));
142+
143+
tp = 0;
144+
GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2);
145+
for (int i = 0; i < 100; i++) {
146+
AtomicInteger j = new AtomicInteger();
147+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
148+
null,
149+
DoubleStream.concat(
150+
DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20),
151+
DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20)
152+
).toArray()
153+
);
154+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
155+
tp += type instanceof ChangeType.TrendChange ? 1 : 0;
156+
}
157+
assertThat(tp, greaterThan(80));
158+
}
159+
160+
public void testDistributionChangeTestPower() {
161+
NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0);
162+
NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0);
163+
int tp = 0;
164+
for (int i = 0; i < 100; i++) {
165+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
166+
null,
167+
DoubleStream.concat(
168+
DoubleStream.generate(() -> 10 + normal1.sample()).limit(50),
169+
DoubleStream.generate(() -> 10 + normal2.sample()).limit(50)
170+
).toArray()
171+
);
172+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
173+
tp += type instanceof ChangeType.DistributionChange ? 1 : 0;
174+
}
175+
assertThat(tp, greaterThan(90));
176+
}
177+
178+
public void testMultipleChanges() {
179+
NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0);
180+
NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0);
181+
NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3);
182+
int tp = 0;
183+
for (int i = 0; i < 100; i++) {
184+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
185+
null,
186+
DoubleStream.concat(
187+
DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)),
188+
DoubleStream.generate(normal3::sample).limit(23)
189+
).toArray()
190+
);
191+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
192+
tp += type instanceof ChangeType.TrendChange ? 1 : 0;
193+
}
194+
assertThat(tp, greaterThan(90));
195+
}
196+
197+
public void testProblemDistributionChange() {
198+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
199+
null,
200+
new double[] {
201+
546.3651753325270,
202+
550.872738079514,
203+
551.1312487618040,
204+
550.3323904749380,
205+
549.2652495378930,
206+
548.9761274963630,
207+
549.3433969743010,
208+
549.0935313531350,
209+
551.1762550747600,
210+
551.3772184469220,
211+
548.6163495094490,
212+
548.5866591594080,
213+
546.9364791288570,
214+
548.1167839989470,
215+
549.3484016149320,
216+
550.4242803917040,
217+
551.2316023050940,
218+
548.4713993534340,
219+
546.0254901960780,
220+
548.4376996805110,
221+
561.1920529801320,
222+
557.3930041152260,
223+
565.8497217068650,
224+
566.787072243346,
225+
546.6094890510950,
226+
530.5905797101450,
227+
556.7340823970040,
228+
557.3857677902620,
229+
543.0754716981130,
230+
574.3297101449280,
231+
559.2962962962960,
232+
549.5202952029520,
233+
531.7217741935480,
234+
551.4333333333330,
235+
557.637168141593,
236+
545.1880733944950,
237+
564.6893203883500,
238+
543.0204081632650,
239+
571.820809248555,
240+
541.2589928057550,
241+
520.4387755102040 }
242+
);
243+
ChangeType type = new ChangeDetector(bucketValues).detect(0.05);
244+
assertThat(type, instanceOf(ChangeType.DistributionChange.class));
245+
}
246+
}

0 commit comments

Comments
 (0)