Skip to content

Commit c2e6f2c

Browse files
committed
Public entrypoint for change point detection
1 parent a2ac799 commit c2e6f2c

File tree

5 files changed

+181
-148
lines changed

5 files changed

+181
-148
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,19 @@ public class ChangeDetector {
3636

3737
private static final Logger logger = LogManager.getLogger(ChangeDetector.class);
3838

39+
private final MlAggsHelper.DoubleBucketValues bucketValues;
3940
private final double[] values;
4041

41-
ChangeDetector(double[] values) {
42-
this.values = values;
42+
ChangeDetector(MlAggsHelper.DoubleBucketValues bucketValues) {
43+
this.bucketValues = bucketValues;
44+
this.values = bucketValues.getValues();
4345
}
4446

45-
public ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
47+
ChangeType detect(double pValueThreshold) {
4648
return testForChange(pValueThreshold).changeType(bucketValues, slope(values));
4749
}
4850

49-
// visible for testing
50-
TestStats testForChange(double pValueThreshold) {
51+
private TestStats testForChange(double pValueThreshold) {
5152

5253
int[] candidateChangePoints = computeCandidateChangePoints(values);
5354
logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints));
@@ -415,8 +416,7 @@ private static double fDistribSf(double numeratorDegreesOfFreedom, double denomi
415416
);
416417
}
417418

418-
// visible for testing
419-
enum Type {
419+
private enum Type {
420420
STATIONARY,
421421
NON_STATIONARY,
422422
STEP_CHANGE,
@@ -442,8 +442,7 @@ public String toString() {
442442
}
443443
}
444444

445-
// visible for testing
446-
record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) {
445+
private record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) {
447446
TestStats(Type type, double pValue, int changePoint, DataStats dataStats) {
448447
this(type, pValue, 0.0, 0.0, changePoint, dataStats);
449448
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -58,30 +58,8 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati
5858
);
5959
}
6060
MlAggsHelper.DoubleBucketValues bucketValues = maybeBucketValues.get();
61-
if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) {
62-
return new InternalChangePointAggregation(
63-
name(),
64-
metadata(),
65-
null,
66-
new ChangeType.Indeterminable(
67-
"not enough buckets to calculate change_point. Requires at least ["
68-
+ ((2 * MINIMUM_BUCKETS) + 2)
69-
+ "]; found ["
70-
+ bucketValues.getValues().length
71-
+ "]"
72-
)
73-
);
74-
}
7561

76-
ChangeType spikeOrDip = testForSpikeOrDip(bucketValues, P_VALUE_THRESHOLD);
77-
78-
// Test for change step, trend and distribution changes.
79-
ChangeType change = testForChange(bucketValues, changePValueThreshold(bucketValues.getValues().length));
80-
logger.trace("change p-value: [{}]", change.pValue());
81-
82-
if (spikeOrDip.pValue() < change.pValue()) {
83-
change = spikeOrDip;
84-
}
62+
ChangeType change = getChangeType(bucketValues);
8563

8664
ChangePointBucket changePointBucket = null;
8765
if (change.changePoint() >= 0) {
@@ -93,20 +71,35 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati
9371
return new InternalChangePointAggregation(name(), metadata(), changePointBucket, change);
9472
}
9573

74+
static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) {
75+
if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) {
76+
return new ChangeType.Indeterminable(
77+
"not enough buckets to calculate change_point. Requires at least ["
78+
+ ((2 * MINIMUM_BUCKETS) + 2)
79+
+ "]; found ["
80+
+ bucketValues.getValues().length
81+
+ "]"
82+
);
83+
}
84+
85+
ChangeType spikeOrDip = testForSpikeOrDip(bucketValues, P_VALUE_THRESHOLD);
86+
ChangeType change = new ChangeDetector(bucketValues).detect(changePValueThreshold(bucketValues.getValues().length));
87+
logger.trace("change p-value: [{}]", change.pValue());
88+
if (spikeOrDip.pValue() < change.pValue()) {
89+
change = spikeOrDip;
90+
}
91+
return change;
92+
}
93+
9694
static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) {
9795
try {
98-
SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues.getValues());
99-
ChangeType result = detect.detect(pValueThreshold, bucketValues);
96+
SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues);
97+
ChangeType result = detect.detect(pValueThreshold);
10098
logger.trace("spike or dip p-value: [{}]", result.pValue());
10199
return result;
102100
} catch (NotStrictlyPositiveException nspe) {
103101
logger.debug("failure testing for dips and spikes", nspe);
102+
return new Indeterminable("failure testing for dips and spikes");
104103
}
105-
return new Indeterminable("failure testing for dips and spikes");
106-
}
107-
108-
static ChangeType testForChange(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) {
109-
double[] timeWindow = bucketValues.getValues();
110-
return new ChangeDetector(timeWindow).detect(pValueThreshold, bucketValues);
111104
}
112105
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
9292
return newValues;
9393
}
9494

95+
private final MlAggsHelper.DoubleBucketValues bucketValues;
9596
private final int numValues;
9697
private final int dipIndex;
9798
private final int spikeIndex;
@@ -100,7 +101,10 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
100101
private final KDE spikeTestKDE;
101102
private final KDE dipTestKDE;
102103

103-
SpikeAndDipDetector(double[] values) {
104+
SpikeAndDipDetector(MlAggsHelper.DoubleBucketValues bucketValues) {
105+
this.bucketValues = bucketValues;
106+
double[] values = bucketValues.getValues();
107+
104108
numValues = values.length;
105109

106110
if (values.length < 4) {
@@ -134,7 +138,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) {
134138
spikeTestKDE = new KDE(spikeKDEValues, 1.36);
135139
}
136140

137-
ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
141+
ChangeType detect(double pValueThreshold) {
138142
if (dipIndex == -1 || spikeIndex == -1) {
139143
return new ChangeType.Indeterminable(
140144
"not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]"

0 commit comments

Comments
 (0)