Skip to content

Commit 16d45d6

Browse files
committed
Fix change point detection for uncertain non-stationary distributions. (#119578)
1 parent 716109b commit 16d45d6

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati
134134
}
135135

136136
ChangePointBucket changePointBucket = null;
137-
if (change.changePoint() >= 0) {
137+
if (change.changePoint() != ChangeType.NO_CHANGE_POINT) {
138138
changePointBucket = extractBucket(bucketsPaths()[0], aggregations, change.changePoint()).map(
139139
b -> new ChangePointBucket(b.getKey(), b.getDocCount(), b.getAggregations())
140140
).orElse(null);
@@ -299,7 +299,7 @@ static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) {
299299
static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) {
300300

301301
double vStep = Double.MAX_VALUE;
302-
int changePoint = -1;
302+
int changePoint = ChangeType.NO_CHANGE_POINT;
303303

304304
// Initialize running stats so that they are only missing the individual changepoint values
305305
RunningStats lowerRange = new RunningStats();
@@ -336,7 +336,7 @@ static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weight
336336
static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) {
337337

338338
double vChange = Double.MAX_VALUE;
339-
int changePoint = -1;
339+
int changePoint = ChangeType.NO_CHANGE_POINT;
340340

341341
// Initialize running stats so that they are only missing the individual changepoint values
342342
RunningStats lowerRange = new RunningStats();
@@ -451,7 +451,7 @@ static TestStats testDistributionChange(
451451
) {
452452

453453
double maxDiff = 0.0;
454-
int changePoint = -1;
454+
int changePoint = ChangeType.NO_CHANGE_POINT;
455455

456456
// Initialize running stats so that they are only missing the individual changepoint values
457457
RunningStats lowerRange = new RunningStats();
@@ -484,6 +484,9 @@ static TestStats testDistributionChange(
484484

485485
double pValue = 1;
486486
for (int cp : sampleData.changePoints()) {
487+
if (cp == ChangeType.NO_CHANGE_POINT) {
488+
continue;
489+
}
487490
double[] x = Arrays.copyOfRange(sampleValues, 0, cp);
488491
double[] y = Arrays.copyOfRange(sampleValues, cp, sampleValues.length);
489492
double statistic = KOLMOGOROV_SMIRNOV_TEST.kolmogorovSmirnovStatistic(x, y);
@@ -520,7 +523,7 @@ record TestStats(Type type, double pValue, double var, double nParams, int chang
520523
}
521524

522525
TestStats(Type type, double pValue, double var, double nParams, DataStats dataStats) {
523-
this(type, pValue, var, nParams, -1, dataStats);
526+
this(type, pValue, var, nParams, ChangeType.NO_CHANGE_POINT, dataStats);
524527
}
525528

526529
boolean accept(double pValueThreshold) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
*/
2222
public interface ChangeType extends NamedWriteable, NamedXContentObject {
2323

24+
int NO_CHANGE_POINT = -1;
25+
2426
default int changePoint() {
25-
return -1;
27+
return NO_CHANGE_POINT;
2628
}
2729

2830
default double pValue() {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,14 @@ public void testSpikeSelectionVsChange() throws IOException {
574574
});
575575
}
576576

577+
public void testUncertainNonStationary() throws IOException {
578+
double[] bucketValues = new double[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700, 735, 715, 0 };
579+
testChangeType(bucketValues, changeType -> {
580+
assertThat(changeType, instanceOf(ChangeType.NonStationary.class));
581+
assertThat(((ChangeType.NonStationary) changeType).getTrend(), equalTo("increasing"));
582+
});
583+
}
584+
577585
void testChangeType(double[] bucketValues, Consumer<ChangeType> changeTypeAssertions) throws IOException {
578586
FilterAggregationBuilder dummy = AggregationBuilders.filter("dummy", new MatchAllQueryBuilder())
579587
.subAggregation(

0 commit comments

Comments
 (0)