Skip to content

Commit 1c360da

Browse files
committed
Uniformize ChangeDetector and SkipeAndDipDetector
1 parent 4fa580c commit 1c360da

File tree

5 files changed

+74
-69
lines changed

5 files changed

+74
-69
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,39 @@
2323
import java.util.function.IntToDoubleFunction;
2424
import java.util.stream.IntStream;
2525

26+
/**
27+
* Detects whether a time series is stationary or changing
28+
* (either continuously or at a specific change point).
29+
*/
2630
public class ChangeDetector {
2731

2832
private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500;
33+
private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000;
34+
2935
private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest();
3036

3137
private static final Logger logger = LogManager.getLogger(ChangeDetector.class);
3238

33-
static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
39+
private final double[] values;
40+
41+
ChangeDetector(double[] values) {
42+
this.values = values;
43+
}
44+
45+
public ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
46+
return testForChange(pValueThreshold).changeType(bucketValues, slope(values));
47+
}
48+
49+
// visible for testing
50+
TestStats testForChange(double pValueThreshold) {
3451

35-
int[] candidateChangePoints = ChangePointAggregator.computeCandidateChangePoints(timeWindow);
52+
int[] candidateChangePoints = computeCandidateChangePoints(values);
3653
logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints));
3754

38-
double[] timeWindowWeights = outlierWeights(timeWindow);
39-
logger.trace("timeWindow: [{}]", Arrays.toString(timeWindow));
40-
logger.trace("timeWindowWeights: [{}]", Arrays.toString(timeWindowWeights));
41-
RunningStats dataRunningStats = RunningStats.from(timeWindow, i -> timeWindowWeights[i]);
55+
double[] valuesWeights = outlierWeights(values);
56+
logger.trace("values: [{}]", Arrays.toString(values));
57+
logger.trace("valuesWeights: [{}]", Arrays.toString(valuesWeights));
58+
RunningStats dataRunningStats = RunningStats.from(values, i -> valuesWeights[i]);
4259
DataStats dataStats = new DataStats(
4360
dataRunningStats.count(),
4461
dataRunningStats.mean(),
@@ -52,38 +69,33 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
5269
return stationary;
5370
}
5471

55-
TestStats trendVsStationary = testTrendVs(stationary, timeWindow, timeWindowWeights);
72+
TestStats trendVsStationary = testTrendVs(stationary, values, valuesWeights);
5673
logger.trace("trend vs stationary: [{}]", trendVsStationary);
5774

5875
TestStats best = stationary;
5976
Set<Integer> discoveredChangePoints = Sets.newHashSetWithExpectedSize(4);
6077
if (trendVsStationary.accept(pValueThreshold)) {
6178
// Check if there is a change in the trend.
62-
TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, timeWindow, timeWindowWeights, candidateChangePoints);
79+
TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, values, valuesWeights, candidateChangePoints);
6380
discoveredChangePoints.add(trendChangeVsTrend.changePoint());
6481
logger.trace("trend change vs trend: [{}]", trendChangeVsTrend);
6582

6683
if (trendChangeVsTrend.accept(pValueThreshold)) {
6784
// Check if modeling a trend change adds much over modeling a step change.
68-
best = testVsStepChange(trendChangeVsTrend, timeWindow, timeWindowWeights, candidateChangePoints, pValueThreshold);
85+
best = testVsStepChange(trendChangeVsTrend, values, valuesWeights, candidateChangePoints, pValueThreshold);
6986
} else {
7087
best = trendVsStationary;
7188
}
7289

7390
} else {
7491
// Check if there is a step change.
75-
TestStats stepChangeVsStationary = testStepChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints);
92+
TestStats stepChangeVsStationary = testStepChangeVs(stationary, values, valuesWeights, candidateChangePoints);
7693
discoveredChangePoints.add(stepChangeVsStationary.changePoint());
7794
logger.trace("step change vs stationary: [{}]", stepChangeVsStationary);
7895

7996
if (stepChangeVsStationary.accept(pValueThreshold)) {
8097
// Check if modeling a trend change adds much over modeling a step change.
81-
TestStats trendChangeVsStepChange = testTrendChangeVs(
82-
stepChangeVsStationary,
83-
timeWindow,
84-
timeWindowWeights,
85-
candidateChangePoints
86-
);
98+
TestStats trendChangeVsStepChange = testTrendChangeVs(stepChangeVsStationary, values, valuesWeights, candidateChangePoints);
8799
discoveredChangePoints.add(stepChangeVsStationary.changePoint());
88100
logger.trace("trend change vs step change: [{}]", trendChangeVsStepChange);
89101
if (trendChangeVsStepChange.accept(pValueThreshold)) {
@@ -94,7 +106,7 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
94106

95107
} else {
96108
// Check if there is a trend change.
97-
TestStats trendChangeVsStationary = testTrendChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints);
109+
TestStats trendChangeVsStationary = testTrendChangeVs(stationary, values, valuesWeights, candidateChangePoints);
98110
discoveredChangePoints.add(stepChangeVsStationary.changePoint());
99111
logger.trace("trend change vs stationary: [{}]", trendChangeVsStationary);
100112
if (trendChangeVsStationary.accept(pValueThreshold)) {
@@ -108,13 +120,7 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
108120
// We're not very confident in the change point, so check if a distribution change
109121
// fits the data better.
110122
if (best.pValueVsStationary() > 1e-5) {
111-
TestStats distChange = testDistributionChange(
112-
dataStats,
113-
timeWindow,
114-
timeWindowWeights,
115-
candidateChangePoints,
116-
discoveredChangePoints
117-
);
123+
TestStats distChange = testDistributionChange(dataStats, values, valuesWeights, candidateChangePoints, discoveredChangePoints);
118124
logger.trace("distribution change: [{}]", distChange);
119125
if (distChange.pValue() < Math.min(pValueThreshold, 0.1 * best.pValueVsStationary())) {
120126
best = distChange;
@@ -124,7 +130,17 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
124130
return best;
125131
}
126132

127-
static double[] outlierWeights(double[] values) {
133+
private int[] computeCandidateChangePoints(double[] values) {
134+
int minValues = Math.max((int) (0.1 * values.length + 0.5), ChangePointAggregator.MINIMUM_BUCKETS);
135+
if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) {
136+
return IntStream.range(minValues, values.length - minValues).toArray();
137+
} else {
138+
int step = (int) Math.ceil((double) (values.length - 2 * minValues) / MAXIMUM_CANDIDATE_CHANGE_POINTS);
139+
return IntStream.range(minValues, values.length - minValues).filter(i -> i % step == 0).toArray();
140+
}
141+
}
142+
143+
private double[] outlierWeights(double[] values) {
128144
int i = (int) Math.ceil(0.025 * values.length);
129145
double[] weights = Arrays.copyOf(values, values.length);
130146
Arrays.sort(weights);
@@ -144,19 +160,19 @@ static double[] outlierWeights(double[] values) {
144160
return weights;
145161
}
146162

147-
static double slope(double[] values) {
163+
private double slope(double[] values) {
148164
SimpleRegression regression = new SimpleRegression();
149165
for (int i = 0; i < values.length; i++) {
150166
regression.addData(i, values[i]);
151167
}
152168
return regression.getSlope();
153169
}
154170

155-
static double independentTrialsPValue(double pValue, int nTrials) {
171+
private static double independentTrialsPValue(double pValue, int nTrials) {
156172
return pValue > 1e-10 ? 1.0 - Math.pow(1.0 - pValue, nTrials) : nTrials * pValue;
157173
}
158174

159-
static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) {
175+
private TestStats testTrendVs(TestStats H0, double[] values, double[] weights) {
160176
LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression(2);
161177
for (int i = 0; i < values.length; i++) {
162178
allLeastSquares.add(i, values[i], weights[i]);
@@ -166,7 +182,7 @@ static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) {
166182
return new TestStats(Type.NON_STATIONARY, pValue, vTrend, 3.0, H0.dataStats());
167183
}
168184

169-
static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) {
185+
private TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) {
170186

171187
double vStep = Double.MAX_VALUE;
172188
int changePoint = -1;
@@ -203,7 +219,7 @@ static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weight
203219
return new TestStats(Type.STEP_CHANGE, pValue, vStep, 2.0, changePoint, H0.dataStats());
204220
}
205221

206-
static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) {
222+
private TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) {
207223

208224
double vChange = Double.MAX_VALUE;
209225
int changePoint = -1;
@@ -252,7 +268,7 @@ static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weigh
252268
return new TestStats(Type.TREND_CHANGE, pValue, vChange, 6.0, changePoint, H0.dataStats());
253269
}
254270

255-
static TestStats testVsStepChange(
271+
private TestStats testVsStepChange(
256272
TestStats trendChange,
257273
double[] values,
258274
double[] weights,
@@ -267,7 +283,7 @@ static TestStats testVsStepChange(
267283
return pValue < pValueThreshold ? trendChange : stepChange;
268284
}
269285

270-
static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) {
286+
private static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) {
271287
if (vAlt == vNull) {
272288
return 1.0;
273289
}
@@ -287,7 +303,7 @@ private static int lowerBound(int[] x, int start, int end, int xs) {
287303
return retVal;
288304
}
289305

290-
static SampleData sample(double[] values, double[] weights, Set<Integer> changePoints) {
306+
private SampleData sample(double[] values, double[] weights, Set<Integer> changePoints) {
291307
Integer[] adjChangePoints = changePoints.toArray(new Integer[changePoints.size()]);
292308
if (values.length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST) {
293309
return new SampleData(values, weights, adjChangePoints);
@@ -320,7 +336,7 @@ static SampleData sample(double[] values, double[] weights, Set<Integer> changeP
320336
return new SampleData(sample, sampleWeights, adjChangePoints);
321337
}
322338

323-
static TestStats testDistributionChange(
339+
private TestStats testDistributionChange(
324340
DataStats stats,
325341
double[] values,
326342
double[] weights,
@@ -384,7 +400,7 @@ static TestStats testDistributionChange(
384400
return new TestStats(Type.DISTRIBUTION_CHANGE, pValue, changePoint, stats);
385401
}
386402

387-
static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) {
403+
private static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) {
388404
if (x <= 0) {
389405
return 1;
390406
}
@@ -399,6 +415,7 @@ static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDeg
399415
);
400416
}
401417

418+
// visible for testing
402419
enum Type {
403420
STATIONARY,
404421
NON_STATIONARY,
@@ -425,6 +442,7 @@ public String toString() {
425442
}
426443
}
427444

445+
// visible for testing
428446
record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) {
429447
TestStats(Type type, double pValue, int changePoint, DataStats dataStats) {
430448
this(type, pValue, 0.0, 0.0, changePoint, dataStats);
@@ -482,7 +500,7 @@ public String toString() {
482500
}
483501
}
484502

485-
static class RunningStats {
503+
private static class RunningStats {
486504
double sumOfSqrs;
487505
double sum;
488506
double count;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.Optional;
23-
import java.util.stream.IntStream;
2423

2524
import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractBucket;
2625
import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractDoubleBucketedValues;
@@ -30,25 +29,14 @@ public class ChangePointAggregator extends SiblingPipelineAggregator {
3029
private static final Logger logger = LogManager.getLogger(ChangePointAggregator.class);
3130

3231
static final double P_VALUE_THRESHOLD = 0.01;
33-
private static final int MINIMUM_BUCKETS = 10;
34-
private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000;
32+
static final int MINIMUM_BUCKETS = 10;
3533

3634
private static double changePValueThreshold(int nValues) {
3735
// This was obtained by simulating the test power for a fixed size effect as a
3836
// function of the bucket value count.
3937
return P_VALUE_THRESHOLD * Math.exp(-0.04 * (double) (nValues - 2 * (MINIMUM_BUCKETS + 1)));
4038
}
4139

42-
static int[] computeCandidateChangePoints(double[] values) {
43-
int minValues = Math.max((int) (0.1 * values.length + 0.5), MINIMUM_BUCKETS);
44-
if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) {
45-
return IntStream.range(minValues, values.length - minValues).toArray();
46-
} else {
47-
int step = (int) Math.ceil((double) (values.length - 2 * minValues) / MAXIMUM_CANDIDATE_CHANGE_POINTS);
48-
return IntStream.range(minValues, values.length - minValues).filter(i -> i % step == 0).toArray();
49-
}
50-
}
51-
5240
public ChangePointAggregator(String name, String bucketsPath, Map<String, Object> metadata) {
5341
super(name, new String[] { bucketsPath }, metadata);
5442
}
@@ -108,7 +96,7 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati
10896
static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) {
10997
try {
11098
SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues.getValues());
111-
ChangeType result = detect.at(pValueThreshold, bucketValues);
99+
ChangeType result = detect.detect(pValueThreshold, bucketValues);
112100
logger.trace("spike or dip p-value: [{}]", result.pValue());
113101
return result;
114102
} catch (NotStrictlyPositiveException nspe) {
@@ -119,6 +107,6 @@ static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues
119107

120108
static ChangeType testForChange(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) {
121109
double[] timeWindow = bucketValues.getValues();
122-
return ChangeDetector.testForChange(timeWindow, pValueThreshold).changeType(bucketValues, ChangeDetector.slope(timeWindow));
110+
return new ChangeDetector(timeWindow).detect(pValueThreshold, bucketValues);
123111
}
124112
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
101101
private final KDE dipTestKDE;
102102

103103
SpikeAndDipDetector(double[] values) {
104-
105104
numValues = values.length;
106105

107106
if (values.length < 4) {
@@ -135,7 +134,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
135134
spikeTestKDE = new KDE(spikeKDEValues, 1.36);
136135
}
137136

138-
ChangeType at(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
137+
ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
139138
if (dipIndex == -1 || spikeIndex == -1) {
140139
return new ChangeType.Indeterminable(
141140
"not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]"

0 commit comments

Comments
 (0)