diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java new file mode 100644 index 0000000000000..cf97ed629cfc5 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java @@ -0,0 +1,562 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.aggs.changepoint; + +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.random.RandomGeneratorFactory; +import org.apache.commons.math3.special.Beta; +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; +import org.apache.commons.math3.stat.regression.SimpleRegression; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; + +import java.util.Arrays; +import java.util.Random; +import java.util.Set; +import java.util.function.IntToDoubleFunction; +import java.util.stream.IntStream; + +/** + * Detects whether a time series is stationary or changing + * (either continuously or at a specific change point). + */ +public class ChangeDetector { + + private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500; + private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000; + + private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest(); + + private static final Logger logger = LogManager.getLogger(ChangeDetector.class); + + private final MlAggsHelper.DoubleBucketValues bucketValues; + private final double[] values; + + ChangeDetector(MlAggsHelper.DoubleBucketValues bucketValues) { + this.bucketValues = bucketValues; + this.values = bucketValues.getValues(); + } + + ChangeType detect(double minBucketsPValue) { + // This was obtained by simulating the test power for a fixed size effect as a + // function of the bucket value count. + double pValueThreshold = minBucketsPValue * Math.exp(-0.04 * (values.length - 2 * (ChangePointDetector.MINIMUM_BUCKETS + 1))); + return testForChange(pValueThreshold).changeType(bucketValues, slope(values)); + } + + private TestStats testForChange(double pValueThreshold) { + + int[] candidateChangePoints = computeCandidateChangePoints(values); + logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints)); + + double[] valuesWeights = outlierWeights(values); + logger.trace("values: [{}]", Arrays.toString(values)); + logger.trace("valuesWeights: [{}]", Arrays.toString(valuesWeights)); + RunningStats dataRunningStats = RunningStats.from(values, i -> valuesWeights[i]); + DataStats dataStats = new DataStats( + dataRunningStats.count(), + dataRunningStats.mean(), + dataRunningStats.variance(), + candidateChangePoints.length + ); + logger.trace("dataStats: [{}]", dataStats); + TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); + + if (dataStats.varianceZeroToWorkingPrecision()) { + return stationary; + } + + TestStats trendVsStationary = testTrendVs(stationary, values, valuesWeights); + logger.trace("trend vs stationary: [{}]", trendVsStationary); + + TestStats best = stationary; + Set discoveredChangePoints = Sets.newHashSetWithExpectedSize(4); + if (trendVsStationary.accept(pValueThreshold)) { + // Check if there is a change in the trend. + TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, values, valuesWeights, candidateChangePoints); + discoveredChangePoints.add(trendChangeVsTrend.changePoint()); + logger.trace("trend change vs trend: [{}]", trendChangeVsTrend); + + if (trendChangeVsTrend.accept(pValueThreshold)) { + // Check if modeling a trend change adds much over modeling a step change. + best = testVsStepChange(trendChangeVsTrend, values, valuesWeights, candidateChangePoints, pValueThreshold); + } else { + best = trendVsStationary; + } + + } else { + // Check if there is a step change. + TestStats stepChangeVsStationary = testStepChangeVs(stationary, values, valuesWeights, candidateChangePoints); + discoveredChangePoints.add(stepChangeVsStationary.changePoint()); + logger.trace("step change vs stationary: [{}]", stepChangeVsStationary); + + if (stepChangeVsStationary.accept(pValueThreshold)) { + // Check if modeling a trend change adds much over modeling a step change. + TestStats trendChangeVsStepChange = testTrendChangeVs(stepChangeVsStationary, values, valuesWeights, candidateChangePoints); + discoveredChangePoints.add(stepChangeVsStationary.changePoint()); + logger.trace("trend change vs step change: [{}]", trendChangeVsStepChange); + if (trendChangeVsStepChange.accept(pValueThreshold)) { + best = trendChangeVsStepChange; + } else { + best = stepChangeVsStationary; + } + + } else { + // Check if there is a trend change. + TestStats trendChangeVsStationary = testTrendChangeVs(stationary, values, valuesWeights, candidateChangePoints); + discoveredChangePoints.add(stepChangeVsStationary.changePoint()); + logger.trace("trend change vs stationary: [{}]", trendChangeVsStationary); + if (trendChangeVsStationary.accept(pValueThreshold)) { + best = trendChangeVsStationary; + } + } + } + + logger.trace("best: [{}]", best.pValueVsStationary()); + + // We're not very confident in the change point, so check if a distribution change + // fits the data better. + if (best.pValueVsStationary() > 1e-5) { + TestStats distChange = testDistributionChange(dataStats, values, valuesWeights, candidateChangePoints, discoveredChangePoints); + logger.trace("distribution change: [{}]", distChange); + if (distChange.pValue() < Math.min(pValueThreshold, 0.1 * best.pValueVsStationary())) { + best = distChange; + } + } + + return best; + } + + private int[] computeCandidateChangePoints(double[] values) { + int minValues = Math.max((int) (0.1 * values.length + 0.5), ChangePointDetector.MINIMUM_BUCKETS); + if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) { + return IntStream.range(minValues, values.length - minValues).toArray(); + } else { + int step = (int) Math.ceil((double) (values.length - 2 * minValues) / MAXIMUM_CANDIDATE_CHANGE_POINTS); + return IntStream.range(minValues, values.length - minValues).filter(i -> i % step == 0).toArray(); + } + } + + private double[] outlierWeights(double[] values) { + int i = (int) Math.ceil(0.025 * values.length); + double[] weights = Arrays.copyOf(values, values.length); + Arrays.sort(weights); + // We have to be careful here if we have a lot of duplicate values. To avoid marking + // runs of duplicates as outliers we define outliers to be the smallest (largest) + // value strictly less (greater) than the value at i (values.length - i - 1). This + // means if i lands in a run of duplicates the entire run will be marked as inliers. + double a = weights[i]; + double b = weights[values.length - i - 1]; + for (int j = 0; j < values.length; j++) { + if (values[j] <= b && values[j] >= a) { + weights[j] = 1.0; + } else { + weights[j] = 0.01; + } + } + return weights; + } + + private double slope(double[] values) { + SimpleRegression regression = new SimpleRegression(); + for (int i = 0; i < values.length; i++) { + regression.addData(i, values[i]); + } + return regression.getSlope(); + } + + private static double independentTrialsPValue(double pValue, int nTrials) { + return pValue > 1e-10 ? 1.0 - Math.pow(1.0 - pValue, nTrials) : nTrials * pValue; + } + + private TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { + LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression(2); + for (int i = 0; i < values.length; i++) { + allLeastSquares.add(i, values[i], weights[i]); + } + double vTrend = H0.dataStats().var() * (1.0 - allLeastSquares.rSquared()); + double pValue = fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vTrend, 3.0); + return new TestStats(Type.NON_STATIONARY, pValue, vTrend, 3.0, H0.dataStats()); + } + + private TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { + + double vStep = Double.MAX_VALUE; + int changePoint = ChangeType.NO_CHANGE_POINT; + + // Initialize running stats so that they are only missing the individual changepoint values + RunningStats lowerRange = new RunningStats(); + RunningStats upperRange = new RunningStats(); + upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); + lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); + double mean = H0.dataStats().mean(); + int last = candidateChangePoints[0]; + for (int cp : candidateChangePoints) { + lowerRange.addValues(values, i -> weights[i], last, cp); + upperRange.removeValues(values, i -> weights[i], last, cp); + last = cp; + double nl = lowerRange.count(); + double nu = upperRange.count(); + double ml = lowerRange.mean(); + double mu = upperRange.mean(); + double vl = lowerRange.variance(); + double vu = upperRange.variance(); + double v = (nl * vl + nu * vu) / (nl + nu); + if (v < vStep) { + vStep = v; + changePoint = cp; + } + } + + double pValue = independentTrialsPValue( + fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vStep, 2.0), + candidateChangePoints.length + ); + + return new TestStats(Type.STEP_CHANGE, pValue, vStep, 2.0, changePoint, H0.dataStats()); + } + + private TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { + + double vChange = Double.MAX_VALUE; + int changePoint = ChangeType.NO_CHANGE_POINT; + + // Initialize running stats so that they are only missing the individual changepoint values + RunningStats lowerRange = new RunningStats(); + RunningStats upperRange = new RunningStats(); + lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); + upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); + LeastSquaresOnlineRegression lowerLeastSquares = new LeastSquaresOnlineRegression(2); + LeastSquaresOnlineRegression upperLeastSquares = new LeastSquaresOnlineRegression(2); + int first = candidateChangePoints[0]; + int last = candidateChangePoints[0]; + for (int i = 0; i < candidateChangePoints[0]; i++) { + lowerLeastSquares.add(i, values[i], weights[i]); + } + for (int i = candidateChangePoints[0]; i < values.length; i++) { + upperLeastSquares.add(i - first, values[i], weights[i]); + } + for (int cp : candidateChangePoints) { + for (int i = last; i < cp; i++) { + lowerRange.addValue(values[i], weights[i]); + upperRange.removeValue(values[i], weights[i]); + lowerLeastSquares.add(i, values[i], weights[i]); + upperLeastSquares.remove(i - first, values[i], weights[i]); + } + last = cp; + double nl = lowerRange.count(); + double nu = upperRange.count(); + double rl = lowerLeastSquares.rSquared(); + double ru = upperLeastSquares.rSquared(); + double vl = lowerRange.variance() * (1.0 - rl); + double vu = upperRange.variance() * (1.0 - ru); + double v = (nl * vl + nu * vu) / (nl + nu); + if (v < vChange) { + vChange = v; + changePoint = cp; + } + } + + double pValue = independentTrialsPValue( + fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vChange, 6.0), + candidateChangePoints.length + ); + + return new TestStats(Type.TREND_CHANGE, pValue, vChange, 6.0, changePoint, H0.dataStats()); + } + + private TestStats testVsStepChange( + TestStats trendChange, + double[] values, + double[] weights, + int[] candidateChangePoints, + double pValueThreshold + ) { + DataStats dataStats = trendChange.dataStats(); + TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); + TestStats stepChange = testStepChangeVs(stationary, values, weights, candidateChangePoints); + double n = dataStats.nValues(); + double pValue = fTestNestedPValue(n, stepChange.var(), 2.0, trendChange.var(), 6.0); + return pValue < pValueThreshold ? trendChange : stepChange; + } + + private static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) { + if (vAlt == vNull) { + return 1.0; + } + if (vAlt == 0.0) { + return 0.0; + } + double F = (vNull - vAlt) / (pAlt - pNull) * (n - pAlt) / vAlt; + double sf = fDistribSf(pAlt - pNull, n - pAlt, F); + return Math.min(2 * sf, 1.0); + } + + private static int lowerBound(int[] x, int start, int end, int xs) { + int retVal = Arrays.binarySearch(x, start, end, xs); + if (retVal < 0) { + retVal = -1 - retVal; + } + return retVal; + } + + private SampleData sample(double[] values, double[] weights, Set changePoints) { + Integer[] adjChangePoints = changePoints.toArray(new Integer[changePoints.size()]); + if (values.length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST) { + return new SampleData(values, weights, adjChangePoints); + } + + // Just want repeatable random numbers. + Random rng = new Random(126832678); + UniformRealDistribution uniform = new UniformRealDistribution(RandomGeneratorFactory.createRandomGenerator(rng), 0.0, 0.99999); + + // Fisher–Yates shuffle (why isn't this in Arrays?). + int[] choice = IntStream.range(0, values.length).toArray(); + for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { + int index = i + (int) Math.floor(uniform.sample() * (values.length - i)); + int tmp = choice[i]; + choice[i] = choice[index]; + choice[index] = tmp; + } + + double[] sample = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; + double[] sampleWeights = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; + Arrays.sort(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST); + for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { + sample[i] = values[choice[i]]; + sampleWeights[i] = weights[choice[i]]; + } + for (int i = 0; i < adjChangePoints.length; ++i) { + adjChangePoints[i] = lowerBound(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST, adjChangePoints[i].intValue()); + } + + return new SampleData(sample, sampleWeights, adjChangePoints); + } + + private TestStats testDistributionChange( + DataStats stats, + double[] values, + double[] weights, + int[] candidateChangePoints, + Set discoveredChangePoints + ) { + + double maxDiff = 0.0; + int changePoint = ChangeType.NO_CHANGE_POINT; + + // Initialize running stats so that they are only missing the individual changepoint values + RunningStats lowerRange = new RunningStats(); + RunningStats upperRange = new RunningStats(); + upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); + lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); + int last = candidateChangePoints[0]; + for (int cp : candidateChangePoints) { + lowerRange.addValues(values, i -> weights[i], last, cp); + upperRange.removeValues(values, i -> weights[i], last, cp); + last = cp; + double scale = Math.min(cp, values.length - cp); + double meanDiff = Math.abs(lowerRange.mean() - upperRange.mean()); + double stdDiff = Math.abs(lowerRange.std() - upperRange.std()); + double diff = scale * (meanDiff + stdDiff); + if (diff >= maxDiff) { + maxDiff = diff; + changePoint = cp; + } + } + discoveredChangePoints.add(changePoint); + + // Note that statistical tests become increasingly powerful as the number of samples + // increases. We are not interested in detecting visually small distribution changes + // in splits of long windows so we randomly downsample the data if it is too large + // before we run the tests. + SampleData sampleData = sample(values, weights, discoveredChangePoints); + final double[] sampleValues = sampleData.values(); + + double pValue = 1; + for (int cp : sampleData.changePoints()) { + if (cp == ChangeType.NO_CHANGE_POINT) { + continue; + } + double[] x = Arrays.copyOfRange(sampleValues, 0, cp); + double[] y = Arrays.copyOfRange(sampleValues, cp, sampleValues.length); + double statistic = KOLMOGOROV_SMIRNOV_TEST.kolmogorovSmirnovStatistic(x, y); + double ksTestPValue = KOLMOGOROV_SMIRNOV_TEST.exactP(statistic, x.length, y.length, false); + if (ksTestPValue < pValue) { + changePoint = cp; + pValue = ksTestPValue; + } + } + + // We start to get false positives if we have too many candidate change points. This + // is the classic p-value hacking problem. However, the Sidak style correction we use + // elsewhere is too conservative because test statistics for different split positions + // are strongly correlated. We assume that we have some effective number of independent + // trials equal to f * n for f < 1. Simulation shows the f = 1/50 yields low Type I + // error rates. + pValue = independentTrialsPValue(pValue, (sampleValues.length + 49) / 50); + logger.trace("distribution change p-value: [{}]", pValue); + + return new TestStats(Type.DISTRIBUTION_CHANGE, pValue, changePoint, stats); + } + + private static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) { + if (x <= 0) { + return 1; + } + if (Double.isInfinite(x) || Double.isNaN(x)) { + return 0; + } + + return Beta.regularizedBeta( + denominatorDegreesOfFreedom / (denominatorDegreesOfFreedom + numeratorDegreesOfFreedom * x), + 0.5 * denominatorDegreesOfFreedom, + 0.5 * numeratorDegreesOfFreedom + ); + } + + private enum Type { + STATIONARY, + NON_STATIONARY, + STEP_CHANGE, + TREND_CHANGE, + DISTRIBUTION_CHANGE + } + + private record SampleData(double[] values, double[] weights, Integer[] changePoints) {} + + private record DataStats(double nValues, double mean, double var, int nCandidateChangePoints) { + boolean varianceZeroToWorkingPrecision() { + // Our variance calculation is only accurate to ulp(length * mean)^(1/2), + // i.e. we compute it using the difference of squares method and don't use + // the Kahan correction. We treat anything as zero to working precision as + // zero. We should at some point switch to a more numerically stable approach + // for computing data statistics. + return var < Math.sqrt(Math.ulp(2.0 * nValues * mean)); + } + + @Override + public String toString() { + return "DataStats{nValues=" + nValues + ", mean=" + mean + ", var=" + var + ", nCandidates=" + nCandidateChangePoints + "}"; + } + } + + private record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { + TestStats(Type type, double pValue, int changePoint, DataStats dataStats) { + this(type, pValue, 0.0, 0.0, changePoint, dataStats); + } + + TestStats(Type type, double pValue, double var, double nParams, DataStats dataStats) { + this(type, pValue, var, nParams, ChangeType.NO_CHANGE_POINT, dataStats); + } + + boolean accept(double pValueThreshold) { + // Check the change is: + // 1. Statistically significant. + // 2. That we explain enough of the data variance overall. + return pValue < pValueThreshold && rSquared() >= 0.5; + } + + double rSquared() { + return 1.0 - var / dataStats.var(); + } + + double pValueVsStationary() { + return independentTrialsPValue( + fTestNestedPValue(dataStats.nValues(), dataStats.var(), 1.0, var, nParams), + dataStats.nCandidateChangePoints() + ); + } + + ChangeType changeType(MlAggsHelper.DoubleBucketValues bucketValues, double slope) { + switch (type) { + case STATIONARY: + return new ChangeType.Stationary(); + case NON_STATIONARY: + return new ChangeType.NonStationary(pValueVsStationary(), rSquared(), slope < 0.0 ? "decreasing" : "increasing"); + case STEP_CHANGE: + return new ChangeType.StepChange(pValueVsStationary(), bucketValues.getBucketIndex(changePoint)); + case TREND_CHANGE: + return new ChangeType.TrendChange(pValueVsStationary(), rSquared(), bucketValues.getBucketIndex(changePoint)); + case DISTRIBUTION_CHANGE: + return new ChangeType.DistributionChange(pValue, bucketValues.getBucketIndex(changePoint)); + } + throw new RuntimeException("Unknown change type [" + type + "]."); + } + + @Override + public String toString() { + return "TestStats{" + + ("type=" + type) + + (", dataStats=" + dataStats) + + (", var=" + var) + + (", rSquared=" + rSquared()) + + (", pValue=" + pValue) + + (", nParams=" + nParams) + + (", changePoint=" + changePoint) + + '}'; + } + } + + private static class RunningStats { + double sumOfSqrs; + double sum; + double count; + + static RunningStats from(double[] values, IntToDoubleFunction weightFunction) { + return new RunningStats().addValues(values, weightFunction, 0, values.length); + } + + RunningStats() {} + + double count() { + return count; + } + + double mean() { + return sum / count; + } + + double variance() { + return Math.max((sumOfSqrs - ((sum * sum) / count)) / count, 0.0); + } + + double std() { + return Math.sqrt(variance()); + } + + RunningStats addValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { + for (int i = start; i < value.length && i < end; i++) { + addValue(value[i], weightFunction.applyAsDouble(i)); + } + return this; + } + + RunningStats addValue(double value, double weight) { + sumOfSqrs += (value * value * weight); + count += weight; + sum += (value * weight); + return this; + } + + RunningStats removeValue(double value, double weight) { + sumOfSqrs = Math.max(sumOfSqrs - value * value * weight, 0); + count = Math.max(count - weight, 0); + sum -= (value * weight); + return this; + } + + RunningStats removeValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { + for (int i = start; i < value.length && i < end; i++) { + removeValue(value[i], weightFunction.applyAsDouble(i)); + } + return this; + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java index faef29ff65070..ce622df184617 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java @@ -7,86 +7,21 @@ package org.elasticsearch.xpack.ml.aggs.changepoint; -import org.apache.commons.math3.distribution.UniformRealDistribution; -import org.apache.commons.math3.exception.NotStrictlyPositiveException; -import org.apache.commons.math3.random.RandomGeneratorFactory; -import org.apache.commons.math3.special.Beta; -import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; -import org.apache.commons.math3.stat.regression.SimpleRegression; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; -import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType.Indeterminable; -import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.Random; -import java.util.Set; -import java.util.function.IntToDoubleFunction; -import java.util.stream.IntStream; import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractBucket; import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractDoubleBucketedValues; public class ChangePointAggregator extends SiblingPipelineAggregator { - private static final Logger logger = LogManager.getLogger(ChangePointAggregator.class); - - static final double P_VALUE_THRESHOLD = 0.01; - private static final int MINIMUM_BUCKETS = 10; - private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000; - private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500; - private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest(); - - private static double changePValueThreshold(int nValues) { - // This was obtained by simulating the test power for a fixed size effect as a - // function of the bucket value count. - return P_VALUE_THRESHOLD * Math.exp(-0.04 * (double) (nValues - 2 * (MINIMUM_BUCKETS + 1))); - } - - private static int lowerBound(int[] x, int start, int end, int xs) { - int retVal = Arrays.binarySearch(x, start, end, xs); - if (retVal < 0) { - retVal = -1 - retVal; - } - return retVal; - } - - private record SampleData(double[] values, double[] weights, Integer[] changePoints) {} - - private record DataStats(double nValues, double mean, double var, int nCandidateChangePoints) { - boolean varianceZeroToWorkingPrecision() { - // Our variance calculation is only accurate to ulp(length * mean)^(1/2), - // i.e. we compute it using the difference of squares method and don't use - // the Kahan correction. We treat anything as zero to working precision as - // zero. We should at some point switch to a more numerically stable approach - // for computing data statistics. - return var < Math.sqrt(Math.ulp(2.0 * nValues * mean)); - } - - @Override - public String toString() { - return "DataStats{nValues=" + nValues + ", mean=" + mean + ", var=" + var + ", nCandidates=" + nCandidateChangePoints + "}"; - } - } - - static int[] computeCandidateChangePoints(double[] values) { - int minValues = Math.max((int) (0.1 * values.length + 0.5), MINIMUM_BUCKETS); - if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) { - return IntStream.range(minValues, values.length - minValues).toArray(); - } else { - int step = (int) Math.ceil((double) (values.length - 2 * minValues) / MAXIMUM_CANDIDATE_CHANGE_POINTS); - return IntStream.range(minValues, values.length - minValues).filter(i -> i % step == 0).toArray(); - } - } - public ChangePointAggregator(String name, String bucketsPath, Map metadata) { super(name, new String[] { bucketsPath }, metadata); } @@ -108,33 +43,11 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati ); } MlAggsHelper.DoubleBucketValues bucketValues = maybeBucketValues.get(); - if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) { - return new InternalChangePointAggregation( - name(), - metadata(), - null, - new ChangeType.Indeterminable( - "not enough buckets to calculate change_point. Requires at least [" - + ((2 * MINIMUM_BUCKETS) + 2) - + "]; found [" - + bucketValues.getValues().length - + "]" - ) - ); - } - - ChangeType spikeOrDip = testForSpikeOrDip(bucketValues, P_VALUE_THRESHOLD); - - // Test for change step, trend and distribution changes. - ChangeType change = testForChange(bucketValues, changePValueThreshold(bucketValues.getValues().length)); - logger.trace("change p-value: [{}]", change.pValue()); - if (spikeOrDip.pValue() < change.pValue()) { - change = spikeOrDip; - } + ChangeType change = ChangePointDetector.getChangeType(bucketValues); ChangePointBucket changePointBucket = null; - if (change.changePoint() >= 0) { + if (change.changePoint() != ChangeType.NO_CHANGE_POINT) { changePointBucket = extractBucket(bucketsPaths()[0], aggregations, change.changePoint()).map( b -> new ChangePointBucket(b.getKey(), b.getDocCount(), b.getAggregations()) ).orElse(null); @@ -142,503 +55,4 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati return new InternalChangePointAggregation(name(), metadata(), changePointBucket, change); } - - static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { - try { - SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues.getValues()); - ChangeType result = detect.at(pValueThreshold, bucketValues); - logger.trace("spike or dip p-value: [{}]", result.pValue()); - return result; - } catch (NotStrictlyPositiveException nspe) { - logger.debug("failure testing for dips and spikes", nspe); - } - return new Indeterminable("failure testing for dips and spikes"); - } - - static ChangeType testForChange(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { - double[] timeWindow = bucketValues.getValues(); - return testForChange(timeWindow, pValueThreshold).changeType(bucketValues, slope(timeWindow)); - } - - static TestStats testForChange(double[] timeWindow, double pValueThreshold) { - - int[] candidateChangePoints = computeCandidateChangePoints(timeWindow); - logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints)); - - double[] timeWindowWeights = outlierWeights(timeWindow); - logger.trace("timeWindow: [{}]", Arrays.toString(timeWindow)); - logger.trace("timeWindowWeights: [{}]", Arrays.toString(timeWindowWeights)); - RunningStats dataRunningStats = RunningStats.from(timeWindow, i -> timeWindowWeights[i]); - DataStats dataStats = new DataStats( - dataRunningStats.count(), - dataRunningStats.mean(), - dataRunningStats.variance(), - candidateChangePoints.length - ); - logger.trace("dataStats: [{}]", dataStats); - TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); - - if (dataStats.varianceZeroToWorkingPrecision()) { - return stationary; - } - - TestStats trendVsStationary = testTrendVs(stationary, timeWindow, timeWindowWeights); - logger.trace("trend vs stationary: [{}]", trendVsStationary); - - TestStats best = stationary; - Set discoveredChangePoints = Sets.newHashSetWithExpectedSize(4); - if (trendVsStationary.accept(pValueThreshold)) { - // Check if there is a change in the trend. - TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, timeWindow, timeWindowWeights, candidateChangePoints); - discoveredChangePoints.add(trendChangeVsTrend.changePoint()); - logger.trace("trend change vs trend: [{}]", trendChangeVsTrend); - - if (trendChangeVsTrend.accept(pValueThreshold)) { - // Check if modeling a trend change adds much over modeling a step change. - best = testVsStepChange(trendChangeVsTrend, timeWindow, timeWindowWeights, candidateChangePoints, pValueThreshold); - } else { - best = trendVsStationary; - } - - } else { - // Check if there is a step change. - TestStats stepChangeVsStationary = testStepChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); - discoveredChangePoints.add(stepChangeVsStationary.changePoint()); - logger.trace("step change vs stationary: [{}]", stepChangeVsStationary); - - if (stepChangeVsStationary.accept(pValueThreshold)) { - // Check if modeling a trend change adds much over modeling a step change. - TestStats trendChangeVsStepChange = testTrendChangeVs( - stepChangeVsStationary, - timeWindow, - timeWindowWeights, - candidateChangePoints - ); - discoveredChangePoints.add(stepChangeVsStationary.changePoint()); - logger.trace("trend change vs step change: [{}]", trendChangeVsStepChange); - if (trendChangeVsStepChange.accept(pValueThreshold)) { - best = trendChangeVsStepChange; - } else { - best = stepChangeVsStationary; - } - - } else { - // Check if there is a trend change. - TestStats trendChangeVsStationary = testTrendChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); - discoveredChangePoints.add(stepChangeVsStationary.changePoint()); - logger.trace("trend change vs stationary: [{}]", trendChangeVsStationary); - if (trendChangeVsStationary.accept(pValueThreshold)) { - best = trendChangeVsStationary; - } - } - } - - logger.trace("best: [{}]", best.pValueVsStationary()); - - // We're not very confident in the change point, so check if a distribution change - // fits the data better. - if (best.pValueVsStationary() > 1e-5) { - TestStats distChange = testDistributionChange( - dataStats, - timeWindow, - timeWindowWeights, - candidateChangePoints, - discoveredChangePoints - ); - logger.trace("distribution change: [{}]", distChange); - if (distChange.pValue() < Math.min(pValueThreshold, 0.1 * best.pValueVsStationary())) { - best = distChange; - } - } - - return best; - } - - static double[] outlierWeights(double[] values) { - int i = (int) Math.ceil(0.025 * values.length); - double[] weights = Arrays.copyOf(values, values.length); - Arrays.sort(weights); - // We have to be careful here if we have a lot of duplicate values. To avoid marking - // runs of duplicates as outliers we define outliers to be the smallest (largest) - // value strictly less (greater) than the value at i (values.length - i - 1). This - // means if i lands in a run of duplicates the entire run will be marked as inliers. - double a = weights[i]; - double b = weights[values.length - i - 1]; - for (int j = 0; j < values.length; j++) { - if (values[j] <= b && values[j] >= a) { - weights[j] = 1.0; - } else { - weights[j] = 0.01; - } - } - return weights; - } - - static double slope(double[] values) { - SimpleRegression regression = new SimpleRegression(); - for (int i = 0; i < values.length; i++) { - regression.addData(i, values[i]); - } - return regression.getSlope(); - } - - static double independentTrialsPValue(double pValue, int nTrials) { - return pValue > 1e-10 ? 1.0 - Math.pow(1.0 - pValue, nTrials) : nTrials * pValue; - } - - static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { - LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression(2); - for (int i = 0; i < values.length; i++) { - allLeastSquares.add(i, values[i], weights[i]); - } - double vTrend = H0.dataStats().var() * (1.0 - allLeastSquares.rSquared()); - double pValue = fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vTrend, 3.0); - return new TestStats(Type.NON_STATIONARY, pValue, vTrend, 3.0, H0.dataStats()); - } - - static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { - - double vStep = Double.MAX_VALUE; - int changePoint = -1; - - // Initialize running stats so that they are only missing the individual changepoint values - RunningStats lowerRange = new RunningStats(); - RunningStats upperRange = new RunningStats(); - upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); - lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); - double mean = H0.dataStats().mean(); - int last = candidateChangePoints[0]; - for (int cp : candidateChangePoints) { - lowerRange.addValues(values, i -> weights[i], last, cp); - upperRange.removeValues(values, i -> weights[i], last, cp); - last = cp; - double nl = lowerRange.count(); - double nu = upperRange.count(); - double ml = lowerRange.mean(); - double mu = upperRange.mean(); - double vl = lowerRange.variance(); - double vu = upperRange.variance(); - double v = (nl * vl + nu * vu) / (nl + nu); - if (v < vStep) { - vStep = v; - changePoint = cp; - } - } - - double pValue = independentTrialsPValue( - fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vStep, 2.0), - candidateChangePoints.length - ); - - return new TestStats(Type.STEP_CHANGE, pValue, vStep, 2.0, changePoint, H0.dataStats()); - } - - static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { - - double vChange = Double.MAX_VALUE; - int changePoint = -1; - - // Initialize running stats so that they are only missing the individual changepoint values - RunningStats lowerRange = new RunningStats(); - RunningStats upperRange = new RunningStats(); - lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); - upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); - LeastSquaresOnlineRegression lowerLeastSquares = new LeastSquaresOnlineRegression(2); - LeastSquaresOnlineRegression upperLeastSquares = new LeastSquaresOnlineRegression(2); - int first = candidateChangePoints[0]; - int last = candidateChangePoints[0]; - for (int i = 0; i < candidateChangePoints[0]; i++) { - lowerLeastSquares.add(i, values[i], weights[i]); - } - for (int i = candidateChangePoints[0]; i < values.length; i++) { - upperLeastSquares.add(i - first, values[i], weights[i]); - } - for (int cp : candidateChangePoints) { - for (int i = last; i < cp; i++) { - lowerRange.addValue(values[i], weights[i]); - upperRange.removeValue(values[i], weights[i]); - lowerLeastSquares.add(i, values[i], weights[i]); - upperLeastSquares.remove(i - first, values[i], weights[i]); - } - last = cp; - double nl = lowerRange.count(); - double nu = upperRange.count(); - double rl = lowerLeastSquares.rSquared(); - double ru = upperLeastSquares.rSquared(); - double vl = lowerRange.variance() * (1.0 - rl); - double vu = upperRange.variance() * (1.0 - ru); - double v = (nl * vl + nu * vu) / (nl + nu); - if (v < vChange) { - vChange = v; - changePoint = cp; - } - } - - double pValue = independentTrialsPValue( - fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vChange, 6.0), - candidateChangePoints.length - ); - - return new TestStats(Type.TREND_CHANGE, pValue, vChange, 6.0, changePoint, H0.dataStats()); - } - - static TestStats testVsStepChange( - TestStats trendChange, - double[] values, - double[] weights, - int[] candidateChangePoints, - double pValueThreshold - ) { - DataStats dataStats = trendChange.dataStats(); - TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); - TestStats stepChange = testStepChangeVs(stationary, values, weights, candidateChangePoints); - double n = dataStats.nValues(); - double pValue = fTestNestedPValue(n, stepChange.var(), 2.0, trendChange.var(), 6.0); - return pValue < pValueThreshold ? trendChange : stepChange; - } - - static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) { - if (vAlt == vNull) { - return 1.0; - } - if (vAlt == 0.0) { - return 0.0; - } - double F = (vNull - vAlt) / (pAlt - pNull) * (n - pAlt) / vAlt; - double sf = fDistribSf(pAlt - pNull, n - pAlt, F); - return Math.min(2 * sf, 1.0); - } - - static SampleData sample(double[] values, double[] weights, Set changePoints) { - Integer[] adjChangePoints = changePoints.toArray(new Integer[changePoints.size()]); - if (values.length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST) { - return new SampleData(values, weights, adjChangePoints); - } - - // Just want repeatable random numbers. - Random rng = new Random(126832678); - UniformRealDistribution uniform = new UniformRealDistribution(RandomGeneratorFactory.createRandomGenerator(rng), 0.0, 0.99999); - - // Fisher–Yates shuffle (why isn't this in Arrays?). - int[] choice = IntStream.range(0, values.length).toArray(); - for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { - int index = i + (int) Math.floor(uniform.sample() * (values.length - i)); - int tmp = choice[i]; - choice[i] = choice[index]; - choice[index] = tmp; - } - - double[] sample = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; - double[] sampleWeights = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; - Arrays.sort(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST); - for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { - sample[i] = values[choice[i]]; - sampleWeights[i] = weights[choice[i]]; - } - for (int i = 0; i < adjChangePoints.length; ++i) { - adjChangePoints[i] = lowerBound(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST, adjChangePoints[i].intValue()); - } - - return new SampleData(sample, sampleWeights, adjChangePoints); - } - - static TestStats testDistributionChange( - DataStats stats, - double[] values, - double[] weights, - int[] candidateChangePoints, - Set discoveredChangePoints - ) { - - double maxDiff = 0.0; - int changePoint = -1; - - // Initialize running stats so that they are only missing the individual changepoint values - RunningStats lowerRange = new RunningStats(); - RunningStats upperRange = new RunningStats(); - upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); - lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); - int last = candidateChangePoints[0]; - for (int cp : candidateChangePoints) { - lowerRange.addValues(values, i -> weights[i], last, cp); - upperRange.removeValues(values, i -> weights[i], last, cp); - last = cp; - double scale = Math.min(cp, values.length - cp); - double meanDiff = Math.abs(lowerRange.mean() - upperRange.mean()); - double stdDiff = Math.abs(lowerRange.std() - upperRange.std()); - double diff = scale * (meanDiff + stdDiff); - if (diff >= maxDiff) { - maxDiff = diff; - changePoint = cp; - } - } - discoveredChangePoints.add(changePoint); - - // Note that statistical tests become increasingly powerful as the number of samples - // increases. We are not interested in detecting visually small distribution changes - // in splits of long windows so we randomly downsample the data if it is too large - // before we run the tests. - SampleData sampleData = sample(values, weights, discoveredChangePoints); - final double[] sampleValues = sampleData.values(); - final double[] sampleWeights = sampleData.weights(); - - double pValue = 1; - for (int cp : sampleData.changePoints()) { - double[] x = Arrays.copyOfRange(sampleValues, 0, cp); - double[] y = Arrays.copyOfRange(sampleValues, cp, sampleValues.length); - double statistic = KOLMOGOROV_SMIRNOV_TEST.kolmogorovSmirnovStatistic(x, y); - double ksTestPValue = KOLMOGOROV_SMIRNOV_TEST.exactP(statistic, x.length, y.length, false); - if (ksTestPValue < pValue) { - changePoint = cp; - pValue = ksTestPValue; - } - } - - // We start to get false positives if we have too many candidate change points. This - // is the classic p-value hacking problem. However, the Sidak style correction we use - // elsewhere is too conservative because test statistics for different split positions - // are strongly correlated. We assume that we have some effective number of independent - // trials equal to f * n for f < 1. Simulation shows the f = 1/50 yields low Type I - // error rates. - pValue = independentTrialsPValue(pValue, (sampleValues.length + 49) / 50); - logger.trace("distribution change p-value: [{}]", pValue); - - return new TestStats(Type.DISTRIBUTION_CHANGE, pValue, changePoint, stats); - } - - enum Type { - STATIONARY, - NON_STATIONARY, - STEP_CHANGE, - TREND_CHANGE, - DISTRIBUTION_CHANGE - } - - record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { - TestStats(Type type, double pValue, int changePoint, DataStats dataStats) { - this(type, pValue, 0.0, 0.0, changePoint, dataStats); - } - - TestStats(Type type, double pValue, double var, double nParams, DataStats dataStats) { - this(type, pValue, var, nParams, -1, dataStats); - } - - boolean accept(double pValueThreshold) { - // Check the change is: - // 1. Statistically significant. - // 2. That we explain enough of the data variance overall. - return pValue < pValueThreshold && rSquared() >= 0.5; - } - - double rSquared() { - return 1.0 - var / dataStats.var(); - } - - double pValueVsStationary() { - return independentTrialsPValue( - fTestNestedPValue(dataStats.nValues(), dataStats.var(), 1.0, var, nParams), - dataStats.nCandidateChangePoints() - ); - } - - ChangeType changeType(MlAggsHelper.DoubleBucketValues bucketValues, double slope) { - switch (type) { - case STATIONARY: - return new ChangeType.Stationary(); - case NON_STATIONARY: - return new ChangeType.NonStationary(pValueVsStationary(), rSquared(), slope < 0.0 ? "decreasing" : "increasing"); - case STEP_CHANGE: - return new ChangeType.StepChange(pValueVsStationary(), bucketValues.getBucketIndex(changePoint)); - case TREND_CHANGE: - return new ChangeType.TrendChange(pValueVsStationary(), rSquared(), bucketValues.getBucketIndex(changePoint)); - case DISTRIBUTION_CHANGE: - return new ChangeType.DistributionChange(pValue, bucketValues.getBucketIndex(changePoint)); - } - throw new RuntimeException("Unknown change type [" + type + "]."); - } - - @Override - public String toString() { - return "TestStats{" - + ("type=" + type) - + (", dataStats=" + dataStats) - + (", var=" + var) - + (", rSquared=" + rSquared()) - + (", pValue=" + pValue) - + (", nParams=" + nParams) - + (", changePoint=" + changePoint) - + '}'; - } - } - - static class RunningStats { - double sumOfSqrs; - double sum; - double count; - - static RunningStats from(double[] values, IntToDoubleFunction weightFunction) { - return new RunningStats().addValues(values, weightFunction, 0, values.length); - } - - RunningStats() {} - - double count() { - return count; - } - - double mean() { - return sum / count; - } - - double variance() { - return Math.max((sumOfSqrs - ((sum * sum) / count)) / count, 0.0); - } - - double std() { - return Math.sqrt(variance()); - } - - RunningStats addValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { - for (int i = start; i < value.length && i < end; i++) { - addValue(value[i], weightFunction.applyAsDouble(i)); - } - return this; - } - - RunningStats addValue(double value, double weight) { - sumOfSqrs += (value * value * weight); - count += weight; - sum += (value * weight); - return this; - } - - RunningStats removeValue(double value, double weight) { - sumOfSqrs = Math.max(sumOfSqrs - value * value * weight, 0); - count = Math.max(count - weight, 0); - sum -= (value * weight); - return this; - } - - RunningStats removeValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { - for (int i = start; i < value.length && i < end; i++) { - removeValue(value[i], weightFunction.applyAsDouble(i)); - } - return this; - } - } - - static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) { - if (x <= 0) { - return 1; - } - if (Double.isInfinite(x) || Double.isNaN(x)) { - return 0; - } - - return Beta.regularizedBeta( - denominatorDegreesOfFreedom / (denominatorDegreesOfFreedom + numeratorDegreesOfFreedom * x), - 0.5 * denominatorDegreesOfFreedom, - 0.5 * numeratorDegreesOfFreedom - ); - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java new file mode 100644 index 0000000000000..d7708420994bb --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.aggs.changepoint; + +import org.apache.commons.math3.exception.NotStrictlyPositiveException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; + +/** + * Detects whether a series of values has a change point, by running both + * ChangeDetector and SpikeAndDipDetector on it. This is the main entrypoint + * of change point detection. + */ +public class ChangePointDetector { + + private static final Logger logger = LogManager.getLogger(ChangePointDetector.class); + + static final double P_VALUE_THRESHOLD = 0.01; + static final int MINIMUM_BUCKETS = 10; + + /** + * Returns the ChangeType of a series of values. + */ + public static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) { + if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) { + return new ChangeType.Indeterminable( + "not enough buckets to calculate change_point. Requires at least [" + + ((2 * MINIMUM_BUCKETS) + 2) + + "]; found [" + + bucketValues.getValues().length + + "]" + ); + } + + ChangeType spikeOrDip; + try { + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); + spikeOrDip = detect.detect(P_VALUE_THRESHOLD); + logger.trace("spike or dip p-value: [{}]", spikeOrDip.pValue()); + } catch (NotStrictlyPositiveException nspe) { + logger.debug("failure testing for dips and spikes", nspe); + spikeOrDip = new ChangeType.Indeterminable("failure testing for dips and spikes"); + } + + ChangeType change = new ChangeDetector(bucketValues).detect(P_VALUE_THRESHOLD); + logger.trace("change p-value: [{}]", change.pValue()); + + if (spikeOrDip.pValue() < change.pValue()) { + change = spikeOrDip; + } + return change; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeType.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeType.java index c62355dc47451..7df542b59107b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeType.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeType.java @@ -21,8 +21,10 @@ */ public interface ChangeType extends NamedWriteable, NamedXContentObject { + int NO_CHANGE_POINT = -1; + default int changePoint() { - return -1; + return NO_CHANGE_POINT; } default double pValue() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java index b628ea3324cf1..365ebe8562d6a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java @@ -92,6 +92,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { return newValues; } + private final MlAggsHelper.DoubleBucketValues bucketValues; private final int numValues; private final int dipIndex; private final int spikeIndex; @@ -100,7 +101,9 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { private final KDE spikeTestKDE; private final KDE dipTestKDE; - SpikeAndDipDetector(double[] values) { + SpikeAndDipDetector(MlAggsHelper.DoubleBucketValues bucketValues) { + this.bucketValues = bucketValues; + double[] values = bucketValues.getValues(); numValues = values.length; @@ -135,7 +138,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { spikeTestKDE = new KDE(spikeKDEValues, 1.36); } - ChangeType at(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) { + ChangeType detect(double pValueThreshold) { if (dipIndex == -1 || spikeIndex == -1) { return new ChangeType.Indeterminable( "not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]" diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java new file mode 100644 index 0000000000000..36076bbb0ec25 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java @@ -0,0 +1,257 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.aggs.changepoint; + +import org.apache.commons.math3.distribution.GammaDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.RandomGeneratorFactory; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.DoubleStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; + +public class ChangeDetectorTests extends AggregatorTestCase { + + public void testStationaryFalsePositiveRate() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int fp = 0; + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.Stationary ? 0 : 1; + } + assertThat(fp, lessThan(10)); + + fp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(gamma::sample).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.Stationary ? 0 : 1; + } + assertThat(fp, lessThan(10)); + } + + public void testSampledDistributionTestFalsePositiveRate() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); + int fp = 0; + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.Stationary ? 0 : 1; + } + assertThat(fp, lessThan(10)); + } + + public void testNonStationaryFalsePositiveRate() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int fp = 0; + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.NonStationary ? 0 : 1; + } + assertThat(fp, lessThan(10)); + + fp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.NonStationary ? 0 : 1; + } + assertThat(fp, lessThan(10)); + } + + public void testStepChangePower() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int tp = 0; + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> normal.sample()).limit(20), + DoubleStream.generate(() -> 10 + normal.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.StepChange ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + + tp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> gamma.sample()).limit(20), + DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.StepChange ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + } + + public void testTrendChangePower() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int tp = 0; + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), + DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.TrendChange ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + + tp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), + DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.TrendChange ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + } + + public void testDistributionChangeTestPower() { + NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); + NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0); + int tp = 0; + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), + DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.DistributionChange ? 1 : 0; + } + assertThat(tp, greaterThan(90)); + } + + public void testMultipleChanges() { + NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0); + NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0); + NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3); + int tp = 0; + for (int i = 0; i < 100; i++) { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), + DoubleStream.generate(normal3::sample).limit(23) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.TrendChange ? 1 : 0; + } + assertThat(tp, greaterThan(90)); + } + + public void testProblemDistributionChange() { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + new double[] { + 546.3651753325270, + 550.872738079514, + 551.1312487618040, + 550.3323904749380, + 549.2652495378930, + 548.9761274963630, + 549.3433969743010, + 549.0935313531350, + 551.1762550747600, + 551.3772184469220, + 548.6163495094490, + 548.5866591594080, + 546.9364791288570, + 548.1167839989470, + 549.3484016149320, + 550.4242803917040, + 551.2316023050940, + 548.4713993534340, + 546.0254901960780, + 548.4376996805110, + 561.1920529801320, + 557.3930041152260, + 565.8497217068650, + 566.787072243346, + 546.6094890510950, + 530.5905797101450, + 556.7340823970040, + 557.3857677902620, + 543.0754716981130, + 574.3297101449280, + 559.2962962962960, + 549.5202952029520, + 531.7217741935480, + 551.4333333333330, + 557.637168141593, + 545.1880733944950, + 564.6893203883500, + 543.0204081632650, + 571.820809248555, + 541.2589928057550, + 520.4387755102040 } + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + assertThat(type, instanceOf(ChangeType.DistributionChange.class)); + } + + public void testUncertainNonStationary() { + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + new double[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 700, 735, 715 } + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.01); + assertThat(type, instanceOf(ChangeType.NonStationary.class)); + assertThat(((ChangeType.NonStationary) type).getTrend(), equalTo("increasing")); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java index 73131efbbcf4b..5cb66aaa5a58c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.aggs.changepoint; -import org.apache.commons.math3.distribution.GammaDistribution; import org.apache.commons.math3.distribution.NormalDistribution; import org.apache.commons.math3.random.RandomGeneratorFactory; import org.apache.logging.log4j.LogManager; @@ -37,10 +36,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertThat; public class ChangePointAggregatorTests extends AggregatorTestCase { @@ -55,194 +51,6 @@ protected List getSearchPlugins() { private static final String NUMERIC_FIELD_NAME = "value"; private static final String TIME_FIELD_NAME = "timestamp"; - public void testStationaryFalsePositiveRate() throws IOException { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int fp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - - fp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> gamma.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - } - - public void testSampledDistributionTestFalsePositiveRate() throws IOException { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); - int fp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - } - - public void testNonStationaryFalsePositiveRate() throws IOException { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int fp = 0; - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.NON_STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - - fp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.NON_STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - } - - public void testStepChangePower() throws IOException { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int tp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> normal.sample()).limit(20), - DoubleStream.generate(() -> 10 + normal.sample()).limit(20) - ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.STEP_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - - tp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> gamma.sample()).limit(20), - DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) - ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.STEP_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - } - - public void testTrendChangePower() throws IOException { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int tp = 0; - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), - DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) - ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.TREND_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - - tp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), - DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) - ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.TREND_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - } - - public void testDistributionChangeTestPower() throws IOException { - NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); - NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0); - int tp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), - DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) - ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.DISTRIBUTION_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(90)); - } - - public void testMultipleChanges() throws IOException { - NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0); - NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0); - NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3); - int tp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.concat( - DoubleStream.generate(() -> normal1.sample()).limit(7), - DoubleStream.generate(() -> normal2.sample()).limit(6) - ), - DoubleStream.generate(() -> normal3.sample()).limit(23) - ).toArray(); - ChangePointAggregator.TestStats result = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += result.type() == ChangePointAggregator.Type.TREND_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(90)); - } - - public void testProblemDistributionChange() throws IOException { - double[] bucketValues = new double[] { - 546.3651753325270, - 550.872738079514, - 551.1312487618040, - 550.3323904749380, - 549.2652495378930, - 548.9761274963630, - 549.3433969743010, - 549.0935313531350, - 551.1762550747600, - 551.3772184469220, - 548.6163495094490, - 548.5866591594080, - 546.9364791288570, - 548.1167839989470, - 549.3484016149320, - 550.4242803917040, - 551.2316023050940, - 548.4713993534340, - 546.0254901960780, - 548.4376996805110, - 561.1920529801320, - 557.3930041152260, - 565.8497217068650, - 566.787072243346, - 546.6094890510950, - 530.5905797101450, - 556.7340823970040, - 557.3857677902620, - 543.0754716981130, - 574.3297101449280, - 559.2962962962960, - 549.5202952029520, - 531.7217741935480, - 551.4333333333330, - 557.637168141593, - 545.1880733944950, - 564.6893203883500, - 543.0204081632650, - 571.820809248555, - 541.2589928057550, - 520.4387755102040 }; - ChangePointAggregator.TestStats result = ChangePointAggregator.testForChange(bucketValues, 0.05); - assertThat(result.type(), equalTo(ChangePointAggregator.Type.DISTRIBUTION_CHANGE)); - } - public void testConstant() throws IOException { double[] bucketValues = DoubleStream.generate(() -> 10).limit(100).toArray(); testChangeType( @@ -262,7 +70,6 @@ public void testSlopeUp() throws IOException { // Handle infrequent false positives. assertThat(changeType, instanceOf(ChangeType.TrendChange.class)); } - }); } @@ -600,5 +407,4 @@ private static void writeTestDocs(RandomIndexWriter w, double[] bucketValues) th epoch_timestamp += INTERVAL.estimateMillis(); } } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java index fe91aa3e6a600..b21a7c4625e83 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java @@ -25,14 +25,14 @@ public void testTooLittleData() { Arrays.fill(docCounts, 1); Arrays.fill(values, 1.0); MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); - assertThat(detect.at(0.01, bucketValues), instanceOf(ChangeType.Indeterminable.class)); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); + assertThat(detect.detect(0.01), instanceOf(ChangeType.Indeterminable.class)); } } public void testSpikeAndDipValues() { double[] values = new double[] { 2.0, 1.0, 3.0, 5.0, 4.0 }; - SpikeAndDipDetector detector = new SpikeAndDipDetector(values); + SpikeAndDipDetector detector = new SpikeAndDipDetector(new MlAggsHelper.DoubleBucketValues(null, values)); assertThat(detector.spikeValue(), equalTo(5.0)); assertThat(detector.dipValue(), equalTo(1.0)); } @@ -133,7 +133,7 @@ public void testExludedValues() { Arrays.sort(expectedSpikeKDEValues); Arrays.sort(expectedDipKDEValues); - SpikeAndDipDetector detector = new SpikeAndDipDetector(values); + SpikeAndDipDetector detector = new SpikeAndDipDetector(new MlAggsHelper.DoubleBucketValues(null, values)); assertThat(detector.spikeValue(), equalTo(10.0)); assertThat(detector.dipValue(), equalTo(-2.0)); @@ -150,9 +150,9 @@ public void testDetection() { double[] values = new double[] { 0.1, 3.1, 1.2, 1.7, 0.9, 2.3, -0.8, 3.2, 1.2, 1.3, 1.1, 1.0, 8.5, 0.5, 2.6, 0.7 }; MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType change = detect.at(0.05, bucketValues); + ChangeType change = detect.detect(0.05); assertThat(change, instanceOf(ChangeType.Spike.class)); assertThat(change.pValue(), closeTo(3.0465e-12, 1e-15)); @@ -162,9 +162,9 @@ public void testDetection() { double[] values = new double[] { 0.1, 3.1, 1.2, 1.7, 0.9, 2.3, -4.2, 3.2, 1.2, 1.3, 1.1, 1.0, 3.5, 0.5, 2.6, 0.7 }; MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType change = detect.at(0.05, bucketValues); + ChangeType change = detect.detect(0.05); assertThat(change, instanceOf(ChangeType.Dip.class)); assertThat(change.pValue(), closeTo(1.2589e-08, 1e-11)); @@ -177,9 +177,9 @@ public void testMissingBuckets() { int[] buckets = new int[] { 0, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13, 15, 17, 18, 19, 20 }; MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values, buckets); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType change = detect.at(0.01, bucketValues); + ChangeType change = detect.detect(0.01); assertThat(change, instanceOf(ChangeType.Spike.class)); assertThat(change.changePoint(), equalTo(10));