From 4fa580c3978e9702d505eecd3453733b2b1d7e10 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Fri, 4 Oct 2024 11:43:27 +0200 Subject: [PATCH 1/6] Move change detection code to separate class --- .../ml/aggs/changepoint/ChangeDetector.java | 540 ++++++++++++++++++ .../changepoint/ChangePointAggregator.java | 522 +---------------- .../ChangePointAggregatorTests.java | 71 ++- 3 files changed, 575 insertions(+), 558 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java new file mode 100644 index 0000000000000..86f0b408d2340 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java @@ -0,0 +1,540 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.aggs.changepoint; + +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.random.RandomGeneratorFactory; +import org.apache.commons.math3.special.Beta; +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; +import org.apache.commons.math3.stat.regression.SimpleRegression; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; + +import java.util.Arrays; +import java.util.Random; +import java.util.Set; +import java.util.function.IntToDoubleFunction; +import java.util.stream.IntStream; + +public class ChangeDetector { + + private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500; + private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest(); + + private static final Logger logger = LogManager.getLogger(ChangeDetector.class); + + static TestStats testForChange(double[] timeWindow, double pValueThreshold) { + + int[] candidateChangePoints = ChangePointAggregator.computeCandidateChangePoints(timeWindow); + logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints)); + + double[] timeWindowWeights = outlierWeights(timeWindow); + logger.trace("timeWindow: [{}]", Arrays.toString(timeWindow)); + logger.trace("timeWindowWeights: [{}]", Arrays.toString(timeWindowWeights)); + RunningStats dataRunningStats = RunningStats.from(timeWindow, i -> timeWindowWeights[i]); + DataStats dataStats = new DataStats( + dataRunningStats.count(), + dataRunningStats.mean(), + dataRunningStats.variance(), + candidateChangePoints.length + ); + logger.trace("dataStats: [{}]", dataStats); + TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); + + if (dataStats.varianceZeroToWorkingPrecision()) { + return stationary; + } + + TestStats trendVsStationary = testTrendVs(stationary, timeWindow, timeWindowWeights); + logger.trace("trend vs stationary: [{}]", trendVsStationary); + + TestStats best = stationary; + Set discoveredChangePoints = Sets.newHashSetWithExpectedSize(4); + if (trendVsStationary.accept(pValueThreshold)) { + // Check if there is a change in the trend. + TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, timeWindow, timeWindowWeights, candidateChangePoints); + discoveredChangePoints.add(trendChangeVsTrend.changePoint()); + logger.trace("trend change vs trend: [{}]", trendChangeVsTrend); + + if (trendChangeVsTrend.accept(pValueThreshold)) { + // Check if modeling a trend change adds much over modeling a step change. + best = testVsStepChange(trendChangeVsTrend, timeWindow, timeWindowWeights, candidateChangePoints, pValueThreshold); + } else { + best = trendVsStationary; + } + + } else { + // Check if there is a step change. + TestStats stepChangeVsStationary = testStepChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); + discoveredChangePoints.add(stepChangeVsStationary.changePoint()); + logger.trace("step change vs stationary: [{}]", stepChangeVsStationary); + + if (stepChangeVsStationary.accept(pValueThreshold)) { + // Check if modeling a trend change adds much over modeling a step change. + TestStats trendChangeVsStepChange = testTrendChangeVs( + stepChangeVsStationary, + timeWindow, + timeWindowWeights, + candidateChangePoints + ); + discoveredChangePoints.add(stepChangeVsStationary.changePoint()); + logger.trace("trend change vs step change: [{}]", trendChangeVsStepChange); + if (trendChangeVsStepChange.accept(pValueThreshold)) { + best = trendChangeVsStepChange; + } else { + best = stepChangeVsStationary; + } + + } else { + // Check if there is a trend change. + TestStats trendChangeVsStationary = testTrendChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); + discoveredChangePoints.add(stepChangeVsStationary.changePoint()); + logger.trace("trend change vs stationary: [{}]", trendChangeVsStationary); + if (trendChangeVsStationary.accept(pValueThreshold)) { + best = trendChangeVsStationary; + } + } + } + + logger.trace("best: [{}]", best.pValueVsStationary()); + + // We're not very confident in the change point, so check if a distribution change + // fits the data better. + if (best.pValueVsStationary() > 1e-5) { + TestStats distChange = testDistributionChange( + dataStats, + timeWindow, + timeWindowWeights, + candidateChangePoints, + discoveredChangePoints + ); + logger.trace("distribution change: [{}]", distChange); + if (distChange.pValue() < Math.min(pValueThreshold, 0.1 * best.pValueVsStationary())) { + best = distChange; + } + } + + return best; + } + + static double[] outlierWeights(double[] values) { + int i = (int) Math.ceil(0.025 * values.length); + double[] weights = Arrays.copyOf(values, values.length); + Arrays.sort(weights); + // We have to be careful here if we have a lot of duplicate values. To avoid marking + // runs of duplicates as outliers we define outliers to be the smallest (largest) + // value strictly less (greater) than the value at i (values.length - i - 1). This + // means if i lands in a run of duplicates the entire run will be marked as inliers. + double a = weights[i]; + double b = weights[values.length - i - 1]; + for (int j = 0; j < values.length; j++) { + if (values[j] <= b && values[j] >= a) { + weights[j] = 1.0; + } else { + weights[j] = 0.01; + } + } + return weights; + } + + static double slope(double[] values) { + SimpleRegression regression = new SimpleRegression(); + for (int i = 0; i < values.length; i++) { + regression.addData(i, values[i]); + } + return regression.getSlope(); + } + + static double independentTrialsPValue(double pValue, int nTrials) { + return pValue > 1e-10 ? 1.0 - Math.pow(1.0 - pValue, nTrials) : nTrials * pValue; + } + + static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { + LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression(2); + for (int i = 0; i < values.length; i++) { + allLeastSquares.add(i, values[i], weights[i]); + } + double vTrend = H0.dataStats().var() * (1.0 - allLeastSquares.rSquared()); + double pValue = fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vTrend, 3.0); + return new TestStats(Type.NON_STATIONARY, pValue, vTrend, 3.0, H0.dataStats()); + } + + static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { + + double vStep = Double.MAX_VALUE; + int changePoint = -1; + + // Initialize running stats so that they are only missing the individual changepoint values + RunningStats lowerRange = new RunningStats(); + RunningStats upperRange = new RunningStats(); + upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); + lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); + double mean = H0.dataStats().mean(); + int last = candidateChangePoints[0]; + for (int cp : candidateChangePoints) { + lowerRange.addValues(values, i -> weights[i], last, cp); + upperRange.removeValues(values, i -> weights[i], last, cp); + last = cp; + double nl = lowerRange.count(); + double nu = upperRange.count(); + double ml = lowerRange.mean(); + double mu = upperRange.mean(); + double vl = lowerRange.variance(); + double vu = upperRange.variance(); + double v = (nl * vl + nu * vu) / (nl + nu); + if (v < vStep) { + vStep = v; + changePoint = cp; + } + } + + double pValue = independentTrialsPValue( + fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vStep, 2.0), + candidateChangePoints.length + ); + + return new TestStats(Type.STEP_CHANGE, pValue, vStep, 2.0, changePoint, H0.dataStats()); + } + + static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { + + double vChange = Double.MAX_VALUE; + int changePoint = -1; + + // Initialize running stats so that they are only missing the individual changepoint values + RunningStats lowerRange = new RunningStats(); + RunningStats upperRange = new RunningStats(); + lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); + upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); + LeastSquaresOnlineRegression lowerLeastSquares = new LeastSquaresOnlineRegression(2); + LeastSquaresOnlineRegression upperLeastSquares = new LeastSquaresOnlineRegression(2); + int first = candidateChangePoints[0]; + int last = candidateChangePoints[0]; + for (int i = 0; i < candidateChangePoints[0]; i++) { + lowerLeastSquares.add(i, values[i], weights[i]); + } + for (int i = candidateChangePoints[0]; i < values.length; i++) { + upperLeastSquares.add(i - first, values[i], weights[i]); + } + for (int cp : candidateChangePoints) { + for (int i = last; i < cp; i++) { + lowerRange.addValue(values[i], weights[i]); + upperRange.removeValue(values[i], weights[i]); + lowerLeastSquares.add(i, values[i], weights[i]); + upperLeastSquares.remove(i - first, values[i], weights[i]); + } + last = cp; + double nl = lowerRange.count(); + double nu = upperRange.count(); + double rl = lowerLeastSquares.rSquared(); + double ru = upperLeastSquares.rSquared(); + double vl = lowerRange.variance() * (1.0 - rl); + double vu = upperRange.variance() * (1.0 - ru); + double v = (nl * vl + nu * vu) / (nl + nu); + if (v < vChange) { + vChange = v; + changePoint = cp; + } + } + + double pValue = independentTrialsPValue( + fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vChange, 6.0), + candidateChangePoints.length + ); + + return new TestStats(Type.TREND_CHANGE, pValue, vChange, 6.0, changePoint, H0.dataStats()); + } + + static TestStats testVsStepChange( + TestStats trendChange, + double[] values, + double[] weights, + int[] candidateChangePoints, + double pValueThreshold + ) { + DataStats dataStats = trendChange.dataStats(); + TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); + TestStats stepChange = testStepChangeVs(stationary, values, weights, candidateChangePoints); + double n = dataStats.nValues(); + double pValue = fTestNestedPValue(n, stepChange.var(), 2.0, trendChange.var(), 6.0); + return pValue < pValueThreshold ? trendChange : stepChange; + } + + static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) { + if (vAlt == vNull) { + return 1.0; + } + if (vAlt == 0.0) { + return 0.0; + } + double F = (vNull - vAlt) / (pAlt - pNull) * (n - pAlt) / vAlt; + double sf = fDistribSf(pAlt - pNull, n - pAlt, F); + return Math.min(2 * sf, 1.0); + } + + private static int lowerBound(int[] x, int start, int end, int xs) { + int retVal = Arrays.binarySearch(x, start, end, xs); + if (retVal < 0) { + retVal = -1 - retVal; + } + return retVal; + } + + static SampleData sample(double[] values, double[] weights, Set changePoints) { + Integer[] adjChangePoints = changePoints.toArray(new Integer[changePoints.size()]); + if (values.length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST) { + return new SampleData(values, weights, adjChangePoints); + } + + // Just want repeatable random numbers. + Random rng = new Random(126832678); + UniformRealDistribution uniform = new UniformRealDistribution(RandomGeneratorFactory.createRandomGenerator(rng), 0.0, 0.99999); + + // Fisher–Yates shuffle (why isn't this in Arrays?). + int[] choice = IntStream.range(0, values.length).toArray(); + for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { + int index = i + (int) Math.floor(uniform.sample() * (values.length - i)); + int tmp = choice[i]; + choice[i] = choice[index]; + choice[index] = tmp; + } + + double[] sample = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; + double[] sampleWeights = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; + Arrays.sort(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST); + for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { + sample[i] = values[choice[i]]; + sampleWeights[i] = weights[choice[i]]; + } + for (int i = 0; i < adjChangePoints.length; ++i) { + adjChangePoints[i] = lowerBound(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST, adjChangePoints[i].intValue()); + } + + return new SampleData(sample, sampleWeights, adjChangePoints); + } + + static TestStats testDistributionChange( + DataStats stats, + double[] values, + double[] weights, + int[] candidateChangePoints, + Set discoveredChangePoints + ) { + + double maxDiff = 0.0; + int changePoint = -1; + + // Initialize running stats so that they are only missing the individual changepoint values + RunningStats lowerRange = new RunningStats(); + RunningStats upperRange = new RunningStats(); + upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); + lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); + int last = candidateChangePoints[0]; + for (int cp : candidateChangePoints) { + lowerRange.addValues(values, i -> weights[i], last, cp); + upperRange.removeValues(values, i -> weights[i], last, cp); + last = cp; + double scale = Math.min(cp, values.length - cp); + double meanDiff = Math.abs(lowerRange.mean() - upperRange.mean()); + double stdDiff = Math.abs(lowerRange.std() - upperRange.std()); + double diff = scale * (meanDiff + stdDiff); + if (diff >= maxDiff) { + maxDiff = diff; + changePoint = cp; + } + } + discoveredChangePoints.add(changePoint); + + // Note that statistical tests become increasingly powerful as the number of samples + // increases. We are not interested in detecting visually small distribution changes + // in splits of long windows so we randomly downsample the data if it is too large + // before we run the tests. + SampleData sampleData = sample(values, weights, discoveredChangePoints); + final double[] sampleValues = sampleData.values(); + final double[] sampleWeights = sampleData.weights(); + + double pValue = 1; + for (int cp : sampleData.changePoints()) { + double[] x = Arrays.copyOfRange(sampleValues, 0, cp); + double[] y = Arrays.copyOfRange(sampleValues, cp, sampleValues.length); + double statistic = KOLMOGOROV_SMIRNOV_TEST.kolmogorovSmirnovStatistic(x, y); + double ksTestPValue = KOLMOGOROV_SMIRNOV_TEST.exactP(statistic, x.length, y.length, false); + if (ksTestPValue < pValue) { + changePoint = cp; + pValue = ksTestPValue; + } + } + + // We start to get false positives if we have too many candidate change points. This + // is the classic p-value hacking problem. However, the Sidak style correction we use + // elsewhere is too conservative because test statistics for different split positions + // are strongly correlated. We assume that we have some effective number of independent + // trials equal to f * n for f < 1. Simulation shows the f = 1/50 yields low Type I + // error rates. + pValue = independentTrialsPValue(pValue, (sampleValues.length + 49) / 50); + logger.trace("distribution change p-value: [{}]", pValue); + + return new TestStats(Type.DISTRIBUTION_CHANGE, pValue, changePoint, stats); + } + + static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) { + if (x <= 0) { + return 1; + } + if (Double.isInfinite(x) || Double.isNaN(x)) { + return 0; + } + + return Beta.regularizedBeta( + denominatorDegreesOfFreedom / (denominatorDegreesOfFreedom + numeratorDegreesOfFreedom * x), + 0.5 * denominatorDegreesOfFreedom, + 0.5 * numeratorDegreesOfFreedom + ); + } + + enum Type { + STATIONARY, + NON_STATIONARY, + STEP_CHANGE, + TREND_CHANGE, + DISTRIBUTION_CHANGE + } + + private record SampleData(double[] values, double[] weights, Integer[] changePoints) {} + + private record DataStats(double nValues, double mean, double var, int nCandidateChangePoints) { + boolean varianceZeroToWorkingPrecision() { + // Our variance calculation is only accurate to ulp(length * mean)^(1/2), + // i.e. we compute it using the difference of squares method and don't use + // the Kahan correction. We treat anything as zero to working precision as + // zero. We should at some point switch to a more numerically stable approach + // for computing data statistics. + return var < Math.sqrt(Math.ulp(2.0 * nValues * mean)); + } + + @Override + public String toString() { + return "DataStats{nValues=" + nValues + ", mean=" + mean + ", var=" + var + ", nCandidates=" + nCandidateChangePoints + "}"; + } + } + + record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { + TestStats(Type type, double pValue, int changePoint, DataStats dataStats) { + this(type, pValue, 0.0, 0.0, changePoint, dataStats); + } + + TestStats(Type type, double pValue, double var, double nParams, DataStats dataStats) { + this(type, pValue, var, nParams, -1, dataStats); + } + + boolean accept(double pValueThreshold) { + // Check the change is: + // 1. Statistically significant. + // 2. That we explain enough of the data variance overall. + return pValue < pValueThreshold && rSquared() >= 0.5; + } + + double rSquared() { + return 1.0 - var / dataStats.var(); + } + + double pValueVsStationary() { + return independentTrialsPValue( + fTestNestedPValue(dataStats.nValues(), dataStats.var(), 1.0, var, nParams), + dataStats.nCandidateChangePoints() + ); + } + + ChangeType changeType(MlAggsHelper.DoubleBucketValues bucketValues, double slope) { + switch (type) { + case STATIONARY: + return new ChangeType.Stationary(); + case NON_STATIONARY: + return new ChangeType.NonStationary(pValueVsStationary(), rSquared(), slope < 0.0 ? "decreasing" : "increasing"); + case STEP_CHANGE: + return new ChangeType.StepChange(pValueVsStationary(), bucketValues.getBucketIndex(changePoint)); + case TREND_CHANGE: + return new ChangeType.TrendChange(pValueVsStationary(), rSquared(), bucketValues.getBucketIndex(changePoint)); + case DISTRIBUTION_CHANGE: + return new ChangeType.DistributionChange(pValue, bucketValues.getBucketIndex(changePoint)); + } + throw new RuntimeException("Unknown change type [" + type + "]."); + } + + @Override + public String toString() { + return "TestStats{" + + ("type=" + type) + + (", dataStats=" + dataStats) + + (", var=" + var) + + (", rSquared=" + rSquared()) + + (", pValue=" + pValue) + + (", nParams=" + nParams) + + (", changePoint=" + changePoint) + + '}'; + } + } + + static class RunningStats { + double sumOfSqrs; + double sum; + double count; + + static RunningStats from(double[] values, IntToDoubleFunction weightFunction) { + return new RunningStats().addValues(values, weightFunction, 0, values.length); + } + + RunningStats() {} + + double count() { + return count; + } + + double mean() { + return sum / count; + } + + double variance() { + return Math.max((sumOfSqrs - ((sum * sum) / count)) / count, 0.0); + } + + double std() { + return Math.sqrt(variance()); + } + + RunningStats addValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { + for (int i = start; i < value.length && i < end; i++) { + addValue(value[i], weightFunction.applyAsDouble(i)); + } + return this; + } + + RunningStats addValue(double value, double weight) { + sumOfSqrs += (value * value * weight); + count += weight; + sum += (value * weight); + return this; + } + + RunningStats removeValue(double value, double weight) { + sumOfSqrs = Math.max(sumOfSqrs - value * value * weight, 0); + count = Math.max(count - weight, 0); + sum -= (value * weight); + return this; + } + + RunningStats removeValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { + for (int i = start; i < value.length && i < end; i++) { + removeValue(value[i], weightFunction.applyAsDouble(i)); + } + return this; + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java index faef29ff65070..e8f44140a4120 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java @@ -7,15 +7,9 @@ package org.elasticsearch.xpack.ml.aggs.changepoint; -import org.apache.commons.math3.distribution.UniformRealDistribution; import org.apache.commons.math3.exception.NotStrictlyPositiveException; -import org.apache.commons.math3.random.RandomGeneratorFactory; -import org.apache.commons.math3.special.Beta; -import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; -import org.apache.commons.math3.stat.regression.SimpleRegression; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -24,12 +18,8 @@ import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType.Indeterminable; -import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.Random; -import java.util.Set; -import java.util.function.IntToDoubleFunction; import java.util.stream.IntStream; import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractBucket; @@ -42,8 +32,6 @@ public class ChangePointAggregator extends SiblingPipelineAggregator { static final double P_VALUE_THRESHOLD = 0.01; private static final int MINIMUM_BUCKETS = 10; private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000; - private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500; - private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest(); private static double changePValueThreshold(int nValues) { // This was obtained by simulating the test power for a fixed size effect as a @@ -51,32 +39,6 @@ private static double changePValueThreshold(int nValues) { return P_VALUE_THRESHOLD * Math.exp(-0.04 * (double) (nValues - 2 * (MINIMUM_BUCKETS + 1))); } - private static int lowerBound(int[] x, int start, int end, int xs) { - int retVal = Arrays.binarySearch(x, start, end, xs); - if (retVal < 0) { - retVal = -1 - retVal; - } - return retVal; - } - - private record SampleData(double[] values, double[] weights, Integer[] changePoints) {} - - private record DataStats(double nValues, double mean, double var, int nCandidateChangePoints) { - boolean varianceZeroToWorkingPrecision() { - // Our variance calculation is only accurate to ulp(length * mean)^(1/2), - // i.e. we compute it using the difference of squares method and don't use - // the Kahan correction. We treat anything as zero to working precision as - // zero. We should at some point switch to a more numerically stable approach - // for computing data statistics. - return var < Math.sqrt(Math.ulp(2.0 * nValues * mean)); - } - - @Override - public String toString() { - return "DataStats{nValues=" + nValues + ", mean=" + mean + ", var=" + var + ", nCandidates=" + nCandidateChangePoints + "}"; - } - } - static int[] computeCandidateChangePoints(double[] values) { int minValues = Math.max((int) (0.1 * values.length + 0.5), MINIMUM_BUCKETS); if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) { @@ -157,488 +119,6 @@ static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues static ChangeType testForChange(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { double[] timeWindow = bucketValues.getValues(); - return testForChange(timeWindow, pValueThreshold).changeType(bucketValues, slope(timeWindow)); - } - - static TestStats testForChange(double[] timeWindow, double pValueThreshold) { - - int[] candidateChangePoints = computeCandidateChangePoints(timeWindow); - logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints)); - - double[] timeWindowWeights = outlierWeights(timeWindow); - logger.trace("timeWindow: [{}]", Arrays.toString(timeWindow)); - logger.trace("timeWindowWeights: [{}]", Arrays.toString(timeWindowWeights)); - RunningStats dataRunningStats = RunningStats.from(timeWindow, i -> timeWindowWeights[i]); - DataStats dataStats = new DataStats( - dataRunningStats.count(), - dataRunningStats.mean(), - dataRunningStats.variance(), - candidateChangePoints.length - ); - logger.trace("dataStats: [{}]", dataStats); - TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); - - if (dataStats.varianceZeroToWorkingPrecision()) { - return stationary; - } - - TestStats trendVsStationary = testTrendVs(stationary, timeWindow, timeWindowWeights); - logger.trace("trend vs stationary: [{}]", trendVsStationary); - - TestStats best = stationary; - Set discoveredChangePoints = Sets.newHashSetWithExpectedSize(4); - if (trendVsStationary.accept(pValueThreshold)) { - // Check if there is a change in the trend. - TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, timeWindow, timeWindowWeights, candidateChangePoints); - discoveredChangePoints.add(trendChangeVsTrend.changePoint()); - logger.trace("trend change vs trend: [{}]", trendChangeVsTrend); - - if (trendChangeVsTrend.accept(pValueThreshold)) { - // Check if modeling a trend change adds much over modeling a step change. - best = testVsStepChange(trendChangeVsTrend, timeWindow, timeWindowWeights, candidateChangePoints, pValueThreshold); - } else { - best = trendVsStationary; - } - - } else { - // Check if there is a step change. - TestStats stepChangeVsStationary = testStepChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); - discoveredChangePoints.add(stepChangeVsStationary.changePoint()); - logger.trace("step change vs stationary: [{}]", stepChangeVsStationary); - - if (stepChangeVsStationary.accept(pValueThreshold)) { - // Check if modeling a trend change adds much over modeling a step change. - TestStats trendChangeVsStepChange = testTrendChangeVs( - stepChangeVsStationary, - timeWindow, - timeWindowWeights, - candidateChangePoints - ); - discoveredChangePoints.add(stepChangeVsStationary.changePoint()); - logger.trace("trend change vs step change: [{}]", trendChangeVsStepChange); - if (trendChangeVsStepChange.accept(pValueThreshold)) { - best = trendChangeVsStepChange; - } else { - best = stepChangeVsStationary; - } - - } else { - // Check if there is a trend change. - TestStats trendChangeVsStationary = testTrendChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); - discoveredChangePoints.add(stepChangeVsStationary.changePoint()); - logger.trace("trend change vs stationary: [{}]", trendChangeVsStationary); - if (trendChangeVsStationary.accept(pValueThreshold)) { - best = trendChangeVsStationary; - } - } - } - - logger.trace("best: [{}]", best.pValueVsStationary()); - - // We're not very confident in the change point, so check if a distribution change - // fits the data better. - if (best.pValueVsStationary() > 1e-5) { - TestStats distChange = testDistributionChange( - dataStats, - timeWindow, - timeWindowWeights, - candidateChangePoints, - discoveredChangePoints - ); - logger.trace("distribution change: [{}]", distChange); - if (distChange.pValue() < Math.min(pValueThreshold, 0.1 * best.pValueVsStationary())) { - best = distChange; - } - } - - return best; - } - - static double[] outlierWeights(double[] values) { - int i = (int) Math.ceil(0.025 * values.length); - double[] weights = Arrays.copyOf(values, values.length); - Arrays.sort(weights); - // We have to be careful here if we have a lot of duplicate values. To avoid marking - // runs of duplicates as outliers we define outliers to be the smallest (largest) - // value strictly less (greater) than the value at i (values.length - i - 1). This - // means if i lands in a run of duplicates the entire run will be marked as inliers. - double a = weights[i]; - double b = weights[values.length - i - 1]; - for (int j = 0; j < values.length; j++) { - if (values[j] <= b && values[j] >= a) { - weights[j] = 1.0; - } else { - weights[j] = 0.01; - } - } - return weights; - } - - static double slope(double[] values) { - SimpleRegression regression = new SimpleRegression(); - for (int i = 0; i < values.length; i++) { - regression.addData(i, values[i]); - } - return regression.getSlope(); - } - - static double independentTrialsPValue(double pValue, int nTrials) { - return pValue > 1e-10 ? 1.0 - Math.pow(1.0 - pValue, nTrials) : nTrials * pValue; - } - - static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { - LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression(2); - for (int i = 0; i < values.length; i++) { - allLeastSquares.add(i, values[i], weights[i]); - } - double vTrend = H0.dataStats().var() * (1.0 - allLeastSquares.rSquared()); - double pValue = fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vTrend, 3.0); - return new TestStats(Type.NON_STATIONARY, pValue, vTrend, 3.0, H0.dataStats()); - } - - static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { - - double vStep = Double.MAX_VALUE; - int changePoint = -1; - - // Initialize running stats so that they are only missing the individual changepoint values - RunningStats lowerRange = new RunningStats(); - RunningStats upperRange = new RunningStats(); - upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); - lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); - double mean = H0.dataStats().mean(); - int last = candidateChangePoints[0]; - for (int cp : candidateChangePoints) { - lowerRange.addValues(values, i -> weights[i], last, cp); - upperRange.removeValues(values, i -> weights[i], last, cp); - last = cp; - double nl = lowerRange.count(); - double nu = upperRange.count(); - double ml = lowerRange.mean(); - double mu = upperRange.mean(); - double vl = lowerRange.variance(); - double vu = upperRange.variance(); - double v = (nl * vl + nu * vu) / (nl + nu); - if (v < vStep) { - vStep = v; - changePoint = cp; - } - } - - double pValue = independentTrialsPValue( - fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vStep, 2.0), - candidateChangePoints.length - ); - - return new TestStats(Type.STEP_CHANGE, pValue, vStep, 2.0, changePoint, H0.dataStats()); - } - - static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { - - double vChange = Double.MAX_VALUE; - int changePoint = -1; - - // Initialize running stats so that they are only missing the individual changepoint values - RunningStats lowerRange = new RunningStats(); - RunningStats upperRange = new RunningStats(); - lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); - upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); - LeastSquaresOnlineRegression lowerLeastSquares = new LeastSquaresOnlineRegression(2); - LeastSquaresOnlineRegression upperLeastSquares = new LeastSquaresOnlineRegression(2); - int first = candidateChangePoints[0]; - int last = candidateChangePoints[0]; - for (int i = 0; i < candidateChangePoints[0]; i++) { - lowerLeastSquares.add(i, values[i], weights[i]); - } - for (int i = candidateChangePoints[0]; i < values.length; i++) { - upperLeastSquares.add(i - first, values[i], weights[i]); - } - for (int cp : candidateChangePoints) { - for (int i = last; i < cp; i++) { - lowerRange.addValue(values[i], weights[i]); - upperRange.removeValue(values[i], weights[i]); - lowerLeastSquares.add(i, values[i], weights[i]); - upperLeastSquares.remove(i - first, values[i], weights[i]); - } - last = cp; - double nl = lowerRange.count(); - double nu = upperRange.count(); - double rl = lowerLeastSquares.rSquared(); - double ru = upperLeastSquares.rSquared(); - double vl = lowerRange.variance() * (1.0 - rl); - double vu = upperRange.variance() * (1.0 - ru); - double v = (nl * vl + nu * vu) / (nl + nu); - if (v < vChange) { - vChange = v; - changePoint = cp; - } - } - - double pValue = independentTrialsPValue( - fTestNestedPValue(H0.dataStats().nValues(), H0.var(), H0.nParams(), vChange, 6.0), - candidateChangePoints.length - ); - - return new TestStats(Type.TREND_CHANGE, pValue, vChange, 6.0, changePoint, H0.dataStats()); - } - - static TestStats testVsStepChange( - TestStats trendChange, - double[] values, - double[] weights, - int[] candidateChangePoints, - double pValueThreshold - ) { - DataStats dataStats = trendChange.dataStats(); - TestStats stationary = new TestStats(Type.STATIONARY, 1.0, dataStats.var(), 1.0, dataStats); - TestStats stepChange = testStepChangeVs(stationary, values, weights, candidateChangePoints); - double n = dataStats.nValues(); - double pValue = fTestNestedPValue(n, stepChange.var(), 2.0, trendChange.var(), 6.0); - return pValue < pValueThreshold ? trendChange : stepChange; - } - - static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) { - if (vAlt == vNull) { - return 1.0; - } - if (vAlt == 0.0) { - return 0.0; - } - double F = (vNull - vAlt) / (pAlt - pNull) * (n - pAlt) / vAlt; - double sf = fDistribSf(pAlt - pNull, n - pAlt, F); - return Math.min(2 * sf, 1.0); - } - - static SampleData sample(double[] values, double[] weights, Set changePoints) { - Integer[] adjChangePoints = changePoints.toArray(new Integer[changePoints.size()]); - if (values.length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST) { - return new SampleData(values, weights, adjChangePoints); - } - - // Just want repeatable random numbers. - Random rng = new Random(126832678); - UniformRealDistribution uniform = new UniformRealDistribution(RandomGeneratorFactory.createRandomGenerator(rng), 0.0, 0.99999); - - // Fisher–Yates shuffle (why isn't this in Arrays?). - int[] choice = IntStream.range(0, values.length).toArray(); - for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { - int index = i + (int) Math.floor(uniform.sample() * (values.length - i)); - int tmp = choice[i]; - choice[i] = choice[index]; - choice[index] = tmp; - } - - double[] sample = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; - double[] sampleWeights = new double[MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST]; - Arrays.sort(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST); - for (int i = 0; i < MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST; ++i) { - sample[i] = values[choice[i]]; - sampleWeights[i] = weights[choice[i]]; - } - for (int i = 0; i < adjChangePoints.length; ++i) { - adjChangePoints[i] = lowerBound(choice, 0, MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST, adjChangePoints[i].intValue()); - } - - return new SampleData(sample, sampleWeights, adjChangePoints); - } - - static TestStats testDistributionChange( - DataStats stats, - double[] values, - double[] weights, - int[] candidateChangePoints, - Set discoveredChangePoints - ) { - - double maxDiff = 0.0; - int changePoint = -1; - - // Initialize running stats so that they are only missing the individual changepoint values - RunningStats lowerRange = new RunningStats(); - RunningStats upperRange = new RunningStats(); - upperRange.addValues(values, i -> weights[i], candidateChangePoints[0], values.length); - lowerRange.addValues(values, i -> weights[i], 0, candidateChangePoints[0]); - int last = candidateChangePoints[0]; - for (int cp : candidateChangePoints) { - lowerRange.addValues(values, i -> weights[i], last, cp); - upperRange.removeValues(values, i -> weights[i], last, cp); - last = cp; - double scale = Math.min(cp, values.length - cp); - double meanDiff = Math.abs(lowerRange.mean() - upperRange.mean()); - double stdDiff = Math.abs(lowerRange.std() - upperRange.std()); - double diff = scale * (meanDiff + stdDiff); - if (diff >= maxDiff) { - maxDiff = diff; - changePoint = cp; - } - } - discoveredChangePoints.add(changePoint); - - // Note that statistical tests become increasingly powerful as the number of samples - // increases. We are not interested in detecting visually small distribution changes - // in splits of long windows so we randomly downsample the data if it is too large - // before we run the tests. - SampleData sampleData = sample(values, weights, discoveredChangePoints); - final double[] sampleValues = sampleData.values(); - final double[] sampleWeights = sampleData.weights(); - - double pValue = 1; - for (int cp : sampleData.changePoints()) { - double[] x = Arrays.copyOfRange(sampleValues, 0, cp); - double[] y = Arrays.copyOfRange(sampleValues, cp, sampleValues.length); - double statistic = KOLMOGOROV_SMIRNOV_TEST.kolmogorovSmirnovStatistic(x, y); - double ksTestPValue = KOLMOGOROV_SMIRNOV_TEST.exactP(statistic, x.length, y.length, false); - if (ksTestPValue < pValue) { - changePoint = cp; - pValue = ksTestPValue; - } - } - - // We start to get false positives if we have too many candidate change points. This - // is the classic p-value hacking problem. However, the Sidak style correction we use - // elsewhere is too conservative because test statistics for different split positions - // are strongly correlated. We assume that we have some effective number of independent - // trials equal to f * n for f < 1. Simulation shows the f = 1/50 yields low Type I - // error rates. - pValue = independentTrialsPValue(pValue, (sampleValues.length + 49) / 50); - logger.trace("distribution change p-value: [{}]", pValue); - - return new TestStats(Type.DISTRIBUTION_CHANGE, pValue, changePoint, stats); - } - - enum Type { - STATIONARY, - NON_STATIONARY, - STEP_CHANGE, - TREND_CHANGE, - DISTRIBUTION_CHANGE - } - - record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { - TestStats(Type type, double pValue, int changePoint, DataStats dataStats) { - this(type, pValue, 0.0, 0.0, changePoint, dataStats); - } - - TestStats(Type type, double pValue, double var, double nParams, DataStats dataStats) { - this(type, pValue, var, nParams, -1, dataStats); - } - - boolean accept(double pValueThreshold) { - // Check the change is: - // 1. Statistically significant. - // 2. That we explain enough of the data variance overall. - return pValue < pValueThreshold && rSquared() >= 0.5; - } - - double rSquared() { - return 1.0 - var / dataStats.var(); - } - - double pValueVsStationary() { - return independentTrialsPValue( - fTestNestedPValue(dataStats.nValues(), dataStats.var(), 1.0, var, nParams), - dataStats.nCandidateChangePoints() - ); - } - - ChangeType changeType(MlAggsHelper.DoubleBucketValues bucketValues, double slope) { - switch (type) { - case STATIONARY: - return new ChangeType.Stationary(); - case NON_STATIONARY: - return new ChangeType.NonStationary(pValueVsStationary(), rSquared(), slope < 0.0 ? "decreasing" : "increasing"); - case STEP_CHANGE: - return new ChangeType.StepChange(pValueVsStationary(), bucketValues.getBucketIndex(changePoint)); - case TREND_CHANGE: - return new ChangeType.TrendChange(pValueVsStationary(), rSquared(), bucketValues.getBucketIndex(changePoint)); - case DISTRIBUTION_CHANGE: - return new ChangeType.DistributionChange(pValue, bucketValues.getBucketIndex(changePoint)); - } - throw new RuntimeException("Unknown change type [" + type + "]."); - } - - @Override - public String toString() { - return "TestStats{" - + ("type=" + type) - + (", dataStats=" + dataStats) - + (", var=" + var) - + (", rSquared=" + rSquared()) - + (", pValue=" + pValue) - + (", nParams=" + nParams) - + (", changePoint=" + changePoint) - + '}'; - } - } - - static class RunningStats { - double sumOfSqrs; - double sum; - double count; - - static RunningStats from(double[] values, IntToDoubleFunction weightFunction) { - return new RunningStats().addValues(values, weightFunction, 0, values.length); - } - - RunningStats() {} - - double count() { - return count; - } - - double mean() { - return sum / count; - } - - double variance() { - return Math.max((sumOfSqrs - ((sum * sum) / count)) / count, 0.0); - } - - double std() { - return Math.sqrt(variance()); - } - - RunningStats addValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { - for (int i = start; i < value.length && i < end; i++) { - addValue(value[i], weightFunction.applyAsDouble(i)); - } - return this; - } - - RunningStats addValue(double value, double weight) { - sumOfSqrs += (value * value * weight); - count += weight; - sum += (value * weight); - return this; - } - - RunningStats removeValue(double value, double weight) { - sumOfSqrs = Math.max(sumOfSqrs - value * value * weight, 0); - count = Math.max(count - weight, 0); - sum -= (value * weight); - return this; - } - - RunningStats removeValues(double[] value, IntToDoubleFunction weightFunction, int start, int end) { - for (int i = start; i < value.length && i < end; i++) { - removeValue(value[i], weightFunction.applyAsDouble(i)); - } - return this; - } - } - - static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) { - if (x <= 0) { - return 1; - } - if (Double.isInfinite(x) || Double.isNaN(x)) { - return 0; - } - - return Beta.regularizedBeta( - denominatorDegreesOfFreedom / (denominatorDegreesOfFreedom + numeratorDegreesOfFreedom * x), - 0.5 * denominatorDegreesOfFreedom, - 0.5 * numeratorDegreesOfFreedom - ); + return ChangeDetector.testForChange(timeWindow, pValueThreshold).changeType(bucketValues, ChangeDetector.slope(timeWindow)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java index 73131efbbcf4b..9cc93f374e0eb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java @@ -55,13 +55,13 @@ protected List getSearchPlugins() { private static final String NUMERIC_FIELD_NAME = "value"; private static final String TIME_FIELD_NAME = "timestamp"; - public void testStationaryFalsePositiveRate() throws IOException { + public void testStationaryFalsePositiveRate() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); int fp = 0; for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.STATIONARY ? 0 : 1; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -69,31 +69,31 @@ public void testStationaryFalsePositiveRate() throws IOException { GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.generate(() -> gamma.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.STATIONARY ? 0 : 1; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); } - public void testSampledDistributionTestFalsePositiveRate() throws IOException { + public void testSampledDistributionTestFalsePositiveRate() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); int fp = 0; for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.STATIONARY ? 0 : 1; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); } - public void testNonStationaryFalsePositiveRate() throws IOException { + public void testNonStationaryFalsePositiveRate() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); int fp = 0; for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.NON_STATIONARY ? 0 : 1; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -102,13 +102,13 @@ public void testNonStationaryFalsePositiveRate() throws IOException { for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 1e-4); - fp += test.type() == ChangePointAggregator.Type.NON_STATIONARY ? 0 : 1; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); } - public void testStepChangePower() throws IOException { + public void testStepChangePower() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); int tp = 0; for (int i = 0; i < 100; i++) { @@ -116,8 +116,8 @@ public void testStepChangePower() throws IOException { DoubleStream.generate(() -> normal.sample()).limit(20), DoubleStream.generate(() -> 10 + normal.sample()).limit(20) ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.STEP_CHANGE ? 1 : 0; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -128,13 +128,13 @@ public void testStepChangePower() throws IOException { DoubleStream.generate(() -> gamma.sample()).limit(20), DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.STEP_CHANGE ? 1 : 0; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); } - public void testTrendChangePower() throws IOException { + public void testTrendChangePower() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); int tp = 0; for (int i = 0; i < 100; i++) { @@ -143,8 +143,8 @@ public void testTrendChangePower() throws IOException { DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.TREND_CHANGE ? 1 : 0; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -156,13 +156,13 @@ public void testTrendChangePower() throws IOException { DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.TREND_CHANGE ? 1 : 0; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); } - public void testDistributionChangeTestPower() throws IOException { + public void testDistributionChangeTestPower() { NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0); int tp = 0; @@ -171,32 +171,29 @@ public void testDistributionChangeTestPower() throws IOException { DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) ).toArray(); - ChangePointAggregator.TestStats test = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += test.type() == ChangePointAggregator.Type.DISTRIBUTION_CHANGE ? 1 : 0; + ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + tp += test.type() == ChangeDetector.Type.DISTRIBUTION_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(90)); } - public void testMultipleChanges() throws IOException { + public void testMultipleChanges() { NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0); NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0); NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3); int tp = 0; for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.concat( - DoubleStream.concat( - DoubleStream.generate(() -> normal1.sample()).limit(7), - DoubleStream.generate(() -> normal2.sample()).limit(6) - ), - DoubleStream.generate(() -> normal3.sample()).limit(23) + DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), + DoubleStream.generate(normal3::sample).limit(23) ).toArray(); - ChangePointAggregator.TestStats result = ChangePointAggregator.testForChange(bucketValues, 0.05); - tp += result.type() == ChangePointAggregator.Type.TREND_CHANGE ? 1 : 0; + ChangeDetector.TestStats result = ChangeDetector.testForChange(bucketValues, 0.05); + tp += result.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(90)); } - public void testProblemDistributionChange() throws IOException { + public void testProblemDistributionChange() { double[] bucketValues = new double[] { 546.3651753325270, 550.872738079514, @@ -239,8 +236,8 @@ public void testProblemDistributionChange() throws IOException { 571.820809248555, 541.2589928057550, 520.4387755102040 }; - ChangePointAggregator.TestStats result = ChangePointAggregator.testForChange(bucketValues, 0.05); - assertThat(result.type(), equalTo(ChangePointAggregator.Type.DISTRIBUTION_CHANGE)); + ChangeDetector.TestStats result = ChangeDetector.testForChange(bucketValues, 0.05); + assertThat(result.type(), equalTo(ChangeDetector.Type.DISTRIBUTION_CHANGE)); } public void testConstant() throws IOException { From 1c360dab995d11599730ef2205f51b358582fa3c Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 7 Oct 2024 15:07:47 +0200 Subject: [PATCH 2/6] Uniformize ChangeDetector and SkipeAndDipDetector --- .../ml/aggs/changepoint/ChangeDetector.java | 90 +++++++++++-------- .../changepoint/ChangePointAggregator.java | 18 +--- .../aggs/changepoint/SpikeAndDipDetector.java | 3 +- .../ChangePointAggregatorTests.java | 24 ++--- .../changepoint/SpikeAndDipDetectorTests.java | 8 +- 5 files changed, 74 insertions(+), 69 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java index 86f0b408d2340..cd29e93869360 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java @@ -23,22 +23,39 @@ import java.util.function.IntToDoubleFunction; import java.util.stream.IntStream; +/** + * Detects whether a time series is stationary or changing + * (either continuously or at a specific change point). + */ public class ChangeDetector { private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500; + private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000; + private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest(); private static final Logger logger = LogManager.getLogger(ChangeDetector.class); - static TestStats testForChange(double[] timeWindow, double pValueThreshold) { + private final double[] values; + + ChangeDetector(double[] values) { + this.values = values; + } + + public ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) { + return testForChange(pValueThreshold).changeType(bucketValues, slope(values)); + } + + // visible for testing + TestStats testForChange(double pValueThreshold) { - int[] candidateChangePoints = ChangePointAggregator.computeCandidateChangePoints(timeWindow); + int[] candidateChangePoints = computeCandidateChangePoints(values); logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints)); - double[] timeWindowWeights = outlierWeights(timeWindow); - logger.trace("timeWindow: [{}]", Arrays.toString(timeWindow)); - logger.trace("timeWindowWeights: [{}]", Arrays.toString(timeWindowWeights)); - RunningStats dataRunningStats = RunningStats.from(timeWindow, i -> timeWindowWeights[i]); + double[] valuesWeights = outlierWeights(values); + logger.trace("values: [{}]", Arrays.toString(values)); + logger.trace("valuesWeights: [{}]", Arrays.toString(valuesWeights)); + RunningStats dataRunningStats = RunningStats.from(values, i -> valuesWeights[i]); DataStats dataStats = new DataStats( dataRunningStats.count(), dataRunningStats.mean(), @@ -52,38 +69,33 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) { return stationary; } - TestStats trendVsStationary = testTrendVs(stationary, timeWindow, timeWindowWeights); + TestStats trendVsStationary = testTrendVs(stationary, values, valuesWeights); logger.trace("trend vs stationary: [{}]", trendVsStationary); TestStats best = stationary; Set discoveredChangePoints = Sets.newHashSetWithExpectedSize(4); if (trendVsStationary.accept(pValueThreshold)) { // Check if there is a change in the trend. - TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, timeWindow, timeWindowWeights, candidateChangePoints); + TestStats trendChangeVsTrend = testTrendChangeVs(trendVsStationary, values, valuesWeights, candidateChangePoints); discoveredChangePoints.add(trendChangeVsTrend.changePoint()); logger.trace("trend change vs trend: [{}]", trendChangeVsTrend); if (trendChangeVsTrend.accept(pValueThreshold)) { // Check if modeling a trend change adds much over modeling a step change. - best = testVsStepChange(trendChangeVsTrend, timeWindow, timeWindowWeights, candidateChangePoints, pValueThreshold); + best = testVsStepChange(trendChangeVsTrend, values, valuesWeights, candidateChangePoints, pValueThreshold); } else { best = trendVsStationary; } } else { // Check if there is a step change. - TestStats stepChangeVsStationary = testStepChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); + TestStats stepChangeVsStationary = testStepChangeVs(stationary, values, valuesWeights, candidateChangePoints); discoveredChangePoints.add(stepChangeVsStationary.changePoint()); logger.trace("step change vs stationary: [{}]", stepChangeVsStationary); if (stepChangeVsStationary.accept(pValueThreshold)) { // Check if modeling a trend change adds much over modeling a step change. - TestStats trendChangeVsStepChange = testTrendChangeVs( - stepChangeVsStationary, - timeWindow, - timeWindowWeights, - candidateChangePoints - ); + TestStats trendChangeVsStepChange = testTrendChangeVs(stepChangeVsStationary, values, valuesWeights, candidateChangePoints); discoveredChangePoints.add(stepChangeVsStationary.changePoint()); logger.trace("trend change vs step change: [{}]", trendChangeVsStepChange); if (trendChangeVsStepChange.accept(pValueThreshold)) { @@ -94,7 +106,7 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) { } else { // Check if there is a trend change. - TestStats trendChangeVsStationary = testTrendChangeVs(stationary, timeWindow, timeWindowWeights, candidateChangePoints); + TestStats trendChangeVsStationary = testTrendChangeVs(stationary, values, valuesWeights, candidateChangePoints); discoveredChangePoints.add(stepChangeVsStationary.changePoint()); logger.trace("trend change vs stationary: [{}]", trendChangeVsStationary); if (trendChangeVsStationary.accept(pValueThreshold)) { @@ -108,13 +120,7 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) { // We're not very confident in the change point, so check if a distribution change // fits the data better. if (best.pValueVsStationary() > 1e-5) { - TestStats distChange = testDistributionChange( - dataStats, - timeWindow, - timeWindowWeights, - candidateChangePoints, - discoveredChangePoints - ); + TestStats distChange = testDistributionChange(dataStats, values, valuesWeights, candidateChangePoints, discoveredChangePoints); logger.trace("distribution change: [{}]", distChange); if (distChange.pValue() < Math.min(pValueThreshold, 0.1 * best.pValueVsStationary())) { best = distChange; @@ -124,7 +130,17 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) { return best; } - static double[] outlierWeights(double[] values) { + private int[] computeCandidateChangePoints(double[] values) { + int minValues = Math.max((int) (0.1 * values.length + 0.5), ChangePointAggregator.MINIMUM_BUCKETS); + if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) { + return IntStream.range(minValues, values.length - minValues).toArray(); + } else { + int step = (int) Math.ceil((double) (values.length - 2 * minValues) / MAXIMUM_CANDIDATE_CHANGE_POINTS); + return IntStream.range(minValues, values.length - minValues).filter(i -> i % step == 0).toArray(); + } + } + + private double[] outlierWeights(double[] values) { int i = (int) Math.ceil(0.025 * values.length); double[] weights = Arrays.copyOf(values, values.length); Arrays.sort(weights); @@ -144,7 +160,7 @@ static double[] outlierWeights(double[] values) { return weights; } - static double slope(double[] values) { + private double slope(double[] values) { SimpleRegression regression = new SimpleRegression(); for (int i = 0; i < values.length; i++) { regression.addData(i, values[i]); @@ -152,11 +168,11 @@ static double slope(double[] values) { return regression.getSlope(); } - static double independentTrialsPValue(double pValue, int nTrials) { + private static double independentTrialsPValue(double pValue, int nTrials) { return pValue > 1e-10 ? 1.0 - Math.pow(1.0 - pValue, nTrials) : nTrials * pValue; } - static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { + private TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression(2); for (int i = 0; i < values.length; i++) { allLeastSquares.add(i, values[i], weights[i]); @@ -166,7 +182,7 @@ static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) { return new TestStats(Type.NON_STATIONARY, pValue, vTrend, 3.0, H0.dataStats()); } - static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { + private TestStats testStepChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { double vStep = Double.MAX_VALUE; int changePoint = -1; @@ -203,7 +219,7 @@ static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weight return new TestStats(Type.STEP_CHANGE, pValue, vStep, 2.0, changePoint, H0.dataStats()); } - static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { + private TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weights, int[] candidateChangePoints) { double vChange = Double.MAX_VALUE; int changePoint = -1; @@ -252,7 +268,7 @@ static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weigh return new TestStats(Type.TREND_CHANGE, pValue, vChange, 6.0, changePoint, H0.dataStats()); } - static TestStats testVsStepChange( + private TestStats testVsStepChange( TestStats trendChange, double[] values, double[] weights, @@ -267,7 +283,7 @@ static TestStats testVsStepChange( return pValue < pValueThreshold ? trendChange : stepChange; } - static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) { + private static double fTestNestedPValue(double n, double vNull, double pNull, double vAlt, double pAlt) { if (vAlt == vNull) { return 1.0; } @@ -287,7 +303,7 @@ private static int lowerBound(int[] x, int start, int end, int xs) { return retVal; } - static SampleData sample(double[] values, double[] weights, Set changePoints) { + private SampleData sample(double[] values, double[] weights, Set changePoints) { Integer[] adjChangePoints = changePoints.toArray(new Integer[changePoints.size()]); if (values.length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST) { return new SampleData(values, weights, adjChangePoints); @@ -320,7 +336,7 @@ static SampleData sample(double[] values, double[] weights, Set changeP return new SampleData(sample, sampleWeights, adjChangePoints); } - static TestStats testDistributionChange( + private TestStats testDistributionChange( DataStats stats, double[] values, double[] weights, @@ -384,7 +400,7 @@ static TestStats testDistributionChange( return new TestStats(Type.DISTRIBUTION_CHANGE, pValue, changePoint, stats); } - static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) { + private static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDegreesOfFreedom, double x) { if (x <= 0) { return 1; } @@ -399,6 +415,7 @@ static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDeg ); } + // visible for testing enum Type { STATIONARY, NON_STATIONARY, @@ -425,6 +442,7 @@ public String toString() { } } + // visible for testing record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { TestStats(Type type, double pValue, int changePoint, DataStats dataStats) { this(type, pValue, 0.0, 0.0, changePoint, dataStats); @@ -482,7 +500,7 @@ public String toString() { } } - static class RunningStats { + private static class RunningStats { double sumOfSqrs; double sum; double count; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java index e8f44140a4120..16d9ace9d7f43 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Optional; -import java.util.stream.IntStream; import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractBucket; import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractDoubleBucketedValues; @@ -30,8 +29,7 @@ public class ChangePointAggregator extends SiblingPipelineAggregator { private static final Logger logger = LogManager.getLogger(ChangePointAggregator.class); static final double P_VALUE_THRESHOLD = 0.01; - private static final int MINIMUM_BUCKETS = 10; - private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000; + static final int MINIMUM_BUCKETS = 10; private static double changePValueThreshold(int nValues) { // This was obtained by simulating the test power for a fixed size effect as a @@ -39,16 +37,6 @@ private static double changePValueThreshold(int nValues) { return P_VALUE_THRESHOLD * Math.exp(-0.04 * (double) (nValues - 2 * (MINIMUM_BUCKETS + 1))); } - static int[] computeCandidateChangePoints(double[] values) { - int minValues = Math.max((int) (0.1 * values.length + 0.5), MINIMUM_BUCKETS); - if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) { - return IntStream.range(minValues, values.length - minValues).toArray(); - } else { - int step = (int) Math.ceil((double) (values.length - 2 * minValues) / MAXIMUM_CANDIDATE_CHANGE_POINTS); - return IntStream.range(minValues, values.length - minValues).filter(i -> i % step == 0).toArray(); - } - } - public ChangePointAggregator(String name, String bucketsPath, Map metadata) { super(name, new String[] { bucketsPath }, metadata); } @@ -108,7 +96,7 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { try { SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues.getValues()); - ChangeType result = detect.at(pValueThreshold, bucketValues); + ChangeType result = detect.detect(pValueThreshold, bucketValues); logger.trace("spike or dip p-value: [{}]", result.pValue()); return result; } catch (NotStrictlyPositiveException nspe) { @@ -119,6 +107,6 @@ static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues static ChangeType testForChange(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { double[] timeWindow = bucketValues.getValues(); - return ChangeDetector.testForChange(timeWindow, pValueThreshold).changeType(bucketValues, ChangeDetector.slope(timeWindow)); + return new ChangeDetector(timeWindow).detect(pValueThreshold, bucketValues); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java index b628ea3324cf1..a4c70cf8a0daf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java @@ -101,7 +101,6 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { private final KDE dipTestKDE; SpikeAndDipDetector(double[] values) { - numValues = values.length; if (values.length < 4) { @@ -135,7 +134,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { spikeTestKDE = new KDE(spikeKDEValues, 1.36); } - ChangeType at(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) { + ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) { if (dipIndex == -1 || spikeIndex == -1) { return new ChangeType.Indeterminable( "not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]" diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java index 9cc93f374e0eb..1c37fb1c325b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java @@ -60,7 +60,7 @@ public void testStationaryFalsePositiveRate() { int fp = 0; for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -69,7 +69,7 @@ public void testStationaryFalsePositiveRate() { GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.generate(() -> gamma.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -80,7 +80,7 @@ public void testSampledDistributionTestFalsePositiveRate() { int fp = 0; for (int i = 0; i < 100; i++) { double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -92,7 +92,7 @@ public void testNonStationaryFalsePositiveRate() { for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -102,7 +102,7 @@ public void testNonStationaryFalsePositiveRate() { for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 1e-4); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -116,7 +116,7 @@ public void testStepChangePower() { DoubleStream.generate(() -> normal.sample()).limit(20), DoubleStream.generate(() -> 10 + normal.sample()).limit(20) ).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -128,7 +128,7 @@ public void testStepChangePower() { DoubleStream.generate(() -> gamma.sample()).limit(20), DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) ).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -143,7 +143,7 @@ public void testTrendChangePower() { DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) ).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -156,7 +156,7 @@ public void testTrendChangePower() { DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) ).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -171,7 +171,7 @@ public void testDistributionChangeTestPower() { DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) ).toArray(); - ChangeDetector.TestStats test = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); tp += test.type() == ChangeDetector.Type.DISTRIBUTION_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(90)); @@ -187,7 +187,7 @@ public void testMultipleChanges() { DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), DoubleStream.generate(normal3::sample).limit(23) ).toArray(); - ChangeDetector.TestStats result = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); tp += result.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; } assertThat(tp, greaterThan(90)); @@ -236,7 +236,7 @@ public void testProblemDistributionChange() { 571.820809248555, 541.2589928057550, 520.4387755102040 }; - ChangeDetector.TestStats result = ChangeDetector.testForChange(bucketValues, 0.05); + ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); assertThat(result.type(), equalTo(ChangeDetector.Type.DISTRIBUTION_CHANGE)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java index fe91aa3e6a600..99e0c8ed3508b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java @@ -26,7 +26,7 @@ public void testTooLittleData() { Arrays.fill(values, 1.0); MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); SpikeAndDipDetector detect = new SpikeAndDipDetector(values); - assertThat(detect.at(0.01, bucketValues), instanceOf(ChangeType.Indeterminable.class)); + assertThat(detect.detect(0.01, bucketValues), instanceOf(ChangeType.Indeterminable.class)); } } @@ -152,7 +152,7 @@ public void testDetection() { SpikeAndDipDetector detect = new SpikeAndDipDetector(values); - ChangeType change = detect.at(0.05, bucketValues); + ChangeType change = detect.detect(0.05, bucketValues); assertThat(change, instanceOf(ChangeType.Spike.class)); assertThat(change.pValue(), closeTo(3.0465e-12, 1e-15)); @@ -164,7 +164,7 @@ public void testDetection() { SpikeAndDipDetector detect = new SpikeAndDipDetector(values); - ChangeType change = detect.at(0.05, bucketValues); + ChangeType change = detect.detect(0.05, bucketValues); assertThat(change, instanceOf(ChangeType.Dip.class)); assertThat(change.pValue(), closeTo(1.2589e-08, 1e-11)); @@ -179,7 +179,7 @@ public void testMissingBuckets() { SpikeAndDipDetector detect = new SpikeAndDipDetector(values); - ChangeType change = detect.at(0.01, bucketValues); + ChangeType change = detect.detect(0.01, bucketValues); assertThat(change, instanceOf(ChangeType.Spike.class)); assertThat(change.changePoint(), equalTo(10)); From a2ac7997b233c2b73130291b25e3b9e535bb853f Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 7 Oct 2024 15:23:29 +0200 Subject: [PATCH 3/6] Separate ChangeDetectorTests and ChangePointAggregatorTests. --- .../aggs/changepoint/ChangeDetectorTests.java | 209 ++++++++++++++++++ .../ChangePointAggregatorTests.java | 191 ---------------- 2 files changed, 209 insertions(+), 191 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java new file mode 100644 index 0000000000000..1f7a4c07172e0 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java @@ -0,0 +1,209 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.aggs.changepoint; + +import org.apache.commons.math3.distribution.GammaDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.RandomGeneratorFactory; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.search.aggregations.AggregatorTestCase; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.DoubleStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +public class ChangeDetectorTests extends AggregatorTestCase { + + public void testStationaryFalsePositiveRate() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int fp = 0; + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); + fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; + } + assertThat(fp, lessThan(10)); + + fp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.generate(() -> gamma.sample()).limit(40).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); + fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; + } + assertThat(fp, lessThan(10)); + } + + public void testSampledDistributionTestFalsePositiveRate() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); + int fp = 0; + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); + fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; + } + assertThat(fp, lessThan(10)); + } + + public void testNonStationaryFalsePositiveRate() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int fp = 0; + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); + fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; + } + assertThat(fp, lessThan(10)); + + fp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); + fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; + } + assertThat(fp, lessThan(10)); + } + + public void testStepChangePower() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int tp = 0; + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.concat( + DoubleStream.generate(() -> normal.sample()).limit(20), + DoubleStream.generate(() -> 10 + normal.sample()).limit(20) + ).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); + tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + + tp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.concat( + DoubleStream.generate(() -> gamma.sample()).limit(20), + DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) + ).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); + tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + } + + public void testTrendChangePower() { + NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); + int tp = 0; + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + double[] bucketValues = DoubleStream.concat( + DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), + DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) + ).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); + tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + + tp = 0; + GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); + for (int i = 0; i < 100; i++) { + AtomicInteger j = new AtomicInteger(); + double[] bucketValues = DoubleStream.concat( + DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), + DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) + ).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); + tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; + } + assertThat(tp, greaterThan(80)); + } + + public void testDistributionChangeTestPower() { + NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); + NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0); + int tp = 0; + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.concat( + DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), + DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) + ).toArray(); + ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); + tp += test.type() == ChangeDetector.Type.DISTRIBUTION_CHANGE ? 1 : 0; + } + assertThat(tp, greaterThan(90)); + } + + public void testMultipleChanges() { + NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0); + NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0); + NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3); + int tp = 0; + for (int i = 0; i < 100; i++) { + double[] bucketValues = DoubleStream.concat( + DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), + DoubleStream.generate(normal3::sample).limit(23) + ).toArray(); + ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); + tp += result.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; + } + assertThat(tp, greaterThan(90)); + } + + public void testProblemDistributionChange() { + double[] bucketValues = new double[] { + 546.3651753325270, + 550.872738079514, + 551.1312487618040, + 550.3323904749380, + 549.2652495378930, + 548.9761274963630, + 549.3433969743010, + 549.0935313531350, + 551.1762550747600, + 551.3772184469220, + 548.6163495094490, + 548.5866591594080, + 546.9364791288570, + 548.1167839989470, + 549.3484016149320, + 550.4242803917040, + 551.2316023050940, + 548.4713993534340, + 546.0254901960780, + 548.4376996805110, + 561.1920529801320, + 557.3930041152260, + 565.8497217068650, + 566.787072243346, + 546.6094890510950, + 530.5905797101450, + 556.7340823970040, + 557.3857677902620, + 543.0754716981130, + 574.3297101449280, + 559.2962962962960, + 549.5202952029520, + 531.7217741935480, + 551.4333333333330, + 557.637168141593, + 545.1880733944950, + 564.6893203883500, + 543.0204081632650, + 571.820809248555, + 541.2589928057550, + 520.4387755102040 }; + ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); + assertThat(result.type(), equalTo(ChangeDetector.Type.DISTRIBUTION_CHANGE)); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java index 1c37fb1c325b2..5cb66aaa5a58c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.aggs.changepoint; -import org.apache.commons.math3.distribution.GammaDistribution; import org.apache.commons.math3.distribution.NormalDistribution; import org.apache.commons.math3.random.RandomGeneratorFactory; import org.apache.logging.log4j.LogManager; @@ -37,10 +36,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertThat; public class ChangePointAggregatorTests extends AggregatorTestCase { @@ -55,191 +51,6 @@ protected List getSearchPlugins() { private static final String NUMERIC_FIELD_NAME = "value"; private static final String TIME_FIELD_NAME = "timestamp"; - public void testStationaryFalsePositiveRate() { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int fp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - - fp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> gamma.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - } - - public void testSampledDistributionTestFalsePositiveRate() { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); - int fp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - } - - public void testNonStationaryFalsePositiveRate() { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int fp = 0; - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - - fp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; - } - assertThat(fp, lessThan(10)); - } - - public void testStepChangePower() { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int tp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> normal.sample()).limit(20), - DoubleStream.generate(() -> 10 + normal.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - - tp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> gamma.sample()).limit(20), - DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - } - - public void testTrendChangePower() { - NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); - int tp = 0; - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), - DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - - tp = 0; - GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); - for (int i = 0; i < 100; i++) { - AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), - DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(80)); - } - - public void testDistributionChangeTestPower() { - NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); - NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0); - int tp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), - DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.DISTRIBUTION_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(90)); - } - - public void testMultipleChanges() { - NormalDistribution normal1 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 78.0, 3.0); - NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 40.0, 6.0); - NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3); - int tp = 0; - for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), - DoubleStream.generate(normal3::sample).limit(23) - ).toArray(); - ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); - tp += result.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; - } - assertThat(tp, greaterThan(90)); - } - - public void testProblemDistributionChange() { - double[] bucketValues = new double[] { - 546.3651753325270, - 550.872738079514, - 551.1312487618040, - 550.3323904749380, - 549.2652495378930, - 548.9761274963630, - 549.3433969743010, - 549.0935313531350, - 551.1762550747600, - 551.3772184469220, - 548.6163495094490, - 548.5866591594080, - 546.9364791288570, - 548.1167839989470, - 549.3484016149320, - 550.4242803917040, - 551.2316023050940, - 548.4713993534340, - 546.0254901960780, - 548.4376996805110, - 561.1920529801320, - 557.3930041152260, - 565.8497217068650, - 566.787072243346, - 546.6094890510950, - 530.5905797101450, - 556.7340823970040, - 557.3857677902620, - 543.0754716981130, - 574.3297101449280, - 559.2962962962960, - 549.5202952029520, - 531.7217741935480, - 551.4333333333330, - 557.637168141593, - 545.1880733944950, - 564.6893203883500, - 543.0204081632650, - 571.820809248555, - 541.2589928057550, - 520.4387755102040 }; - ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); - assertThat(result.type(), equalTo(ChangeDetector.Type.DISTRIBUTION_CHANGE)); - } - public void testConstant() throws IOException { double[] bucketValues = DoubleStream.generate(() -> 10).limit(100).toArray(); testChangeType( @@ -259,7 +70,6 @@ public void testSlopeUp() throws IOException { // Handle infrequent false positives. assertThat(changeType, instanceOf(ChangeType.TrendChange.class)); } - }); } @@ -597,5 +407,4 @@ private static void writeTestDocs(RandomIndexWriter w, double[] bucketValues) th epoch_timestamp += INTERVAL.estimateMillis(); } } - } From c2e6f2c3097073f86e4b0217c31f5b66f6606c85 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Tue, 8 Oct 2024 11:18:05 +0200 Subject: [PATCH 4/6] Public entrypoint for change point detection --- .../ml/aggs/changepoint/ChangeDetector.java | 17 +- .../changepoint/ChangePointAggregator.java | 55 ++--- .../aggs/changepoint/SpikeAndDipDetector.java | 8 +- .../aggs/changepoint/ChangeDetectorTests.java | 229 ++++++++++-------- .../changepoint/SpikeAndDipDetectorTests.java | 20 +- 5 files changed, 181 insertions(+), 148 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java index cd29e93869360..bdb14cdf0fd3c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java @@ -36,18 +36,19 @@ public class ChangeDetector { private static final Logger logger = LogManager.getLogger(ChangeDetector.class); + private final MlAggsHelper.DoubleBucketValues bucketValues; private final double[] values; - ChangeDetector(double[] values) { - this.values = values; + ChangeDetector(MlAggsHelper.DoubleBucketValues bucketValues) { + this.bucketValues = bucketValues; + this.values = bucketValues.getValues(); } - public ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) { + ChangeType detect(double pValueThreshold) { return testForChange(pValueThreshold).changeType(bucketValues, slope(values)); } - // visible for testing - TestStats testForChange(double pValueThreshold) { + private TestStats testForChange(double pValueThreshold) { int[] candidateChangePoints = computeCandidateChangePoints(values); logger.trace("candidatePoints: [{}]", Arrays.toString(candidateChangePoints)); @@ -415,8 +416,7 @@ private static double fDistribSf(double numeratorDegreesOfFreedom, double denomi ); } - // visible for testing - enum Type { + private enum Type { STATIONARY, NON_STATIONARY, STEP_CHANGE, @@ -442,8 +442,7 @@ public String toString() { } } - // visible for testing - record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { + private record TestStats(Type type, double pValue, double var, double nParams, int changePoint, DataStats dataStats) { TestStats(Type type, double pValue, int changePoint, DataStats dataStats) { this(type, pValue, 0.0, 0.0, changePoint, dataStats); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java index 16d9ace9d7f43..be8ec05f4b740 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java @@ -58,30 +58,8 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati ); } MlAggsHelper.DoubleBucketValues bucketValues = maybeBucketValues.get(); - if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) { - return new InternalChangePointAggregation( - name(), - metadata(), - null, - new ChangeType.Indeterminable( - "not enough buckets to calculate change_point. Requires at least [" - + ((2 * MINIMUM_BUCKETS) + 2) - + "]; found [" - + bucketValues.getValues().length - + "]" - ) - ); - } - ChangeType spikeOrDip = testForSpikeOrDip(bucketValues, P_VALUE_THRESHOLD); - - // Test for change step, trend and distribution changes. - ChangeType change = testForChange(bucketValues, changePValueThreshold(bucketValues.getValues().length)); - logger.trace("change p-value: [{}]", change.pValue()); - - if (spikeOrDip.pValue() < change.pValue()) { - change = spikeOrDip; - } + ChangeType change = getChangeType(bucketValues); ChangePointBucket changePointBucket = null; if (change.changePoint() >= 0) { @@ -93,20 +71,35 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati return new InternalChangePointAggregation(name(), metadata(), changePointBucket, change); } + static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) { + if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) { + return new ChangeType.Indeterminable( + "not enough buckets to calculate change_point. Requires at least [" + + ((2 * MINIMUM_BUCKETS) + 2) + + "]; found [" + + bucketValues.getValues().length + + "]" + ); + } + + ChangeType spikeOrDip = testForSpikeOrDip(bucketValues, P_VALUE_THRESHOLD); + ChangeType change = new ChangeDetector(bucketValues).detect(changePValueThreshold(bucketValues.getValues().length)); + logger.trace("change p-value: [{}]", change.pValue()); + if (spikeOrDip.pValue() < change.pValue()) { + change = spikeOrDip; + } + return change; + } + static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { try { - SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues.getValues()); - ChangeType result = detect.detect(pValueThreshold, bucketValues); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); + ChangeType result = detect.detect(pValueThreshold); logger.trace("spike or dip p-value: [{}]", result.pValue()); return result; } catch (NotStrictlyPositiveException nspe) { logger.debug("failure testing for dips and spikes", nspe); + return new Indeterminable("failure testing for dips and spikes"); } - return new Indeterminable("failure testing for dips and spikes"); - } - - static ChangeType testForChange(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { - double[] timeWindow = bucketValues.getValues(); - return new ChangeDetector(timeWindow).detect(pValueThreshold, bucketValues); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java index a4c70cf8a0daf..365ebe8562d6a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java @@ -92,6 +92,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { return newValues; } + private final MlAggsHelper.DoubleBucketValues bucketValues; private final int numValues; private final int dipIndex; private final int spikeIndex; @@ -100,7 +101,10 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { private final KDE spikeTestKDE; private final KDE dipTestKDE; - SpikeAndDipDetector(double[] values) { + SpikeAndDipDetector(MlAggsHelper.DoubleBucketValues bucketValues) { + this.bucketValues = bucketValues; + double[] values = bucketValues.getValues(); + numValues = values.length; if (values.length < 4) { @@ -134,7 +138,7 @@ private double[] removeIf(ExcludedPredicate should, double[] values) { spikeTestKDE = new KDE(spikeKDEValues, 1.36); } - ChangeType detect(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) { + ChangeType detect(double pValueThreshold) { if (dipIndex == -1 || spikeIndex == -1) { return new ChangeType.Indeterminable( "not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]" diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java index 1f7a4c07172e0..75f668a96e77e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetectorTests.java @@ -12,12 +12,13 @@ import org.apache.commons.math3.random.RandomGeneratorFactory; import org.elasticsearch.common.Randomness; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.DoubleStream; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; public class ChangeDetectorTests extends AggregatorTestCase { @@ -26,18 +27,24 @@ public void testStationaryFalsePositiveRate() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); int fp = 0; for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> 10 + normal.sample()).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.Stationary ? 0 : 1; } assertThat(fp, lessThan(10)); fp = 0; GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> gamma.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(gamma::sample).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.Stationary ? 0 : 1; } assertThat(fp, lessThan(10)); } @@ -46,9 +53,12 @@ public void testSampledDistributionTestFalsePositiveRate() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 1.0); int fp = 0; for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.STATIONARY ? 0 : 1; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> 10 + normal.sample()).limit(5000).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.Stationary ? 0 : 1; } assertThat(fp, lessThan(10)); } @@ -58,9 +68,12 @@ public void testNonStationaryFalsePositiveRate() { int fp = 0; for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.NonStationary ? 0 : 1; } assertThat(fp, lessThan(10)); @@ -68,9 +81,12 @@ public void testNonStationaryFalsePositiveRate() { GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(1e-4); - fp += test.type() == ChangeDetector.Type.NON_STATIONARY ? 0 : 1; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(40).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(1e-4); + fp += type instanceof ChangeType.NonStationary ? 0 : 1; } assertThat(fp, lessThan(10)); } @@ -79,24 +95,30 @@ public void testStepChangePower() { NormalDistribution normal = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0, 2); int tp = 0; for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> normal.sample()).limit(20), - DoubleStream.generate(() -> 10 + normal.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> normal.sample()).limit(20), + DoubleStream.generate(() -> 10 + normal.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.StepChange ? 1 : 0; } assertThat(tp, greaterThan(80)); tp = 0; GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> gamma.sample()).limit(20), - DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.STEP_CHANGE ? 1 : 0; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> gamma.sample()).limit(20), + DoubleStream.generate(() -> 10 + gamma.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.StepChange ? 1 : 0; } assertThat(tp, greaterThan(80)); } @@ -106,12 +128,15 @@ public void testTrendChangePower() { int tp = 0; for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), - DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> j.incrementAndGet() + normal.sample()).limit(20), + DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + normal.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.TrendChange ? 1 : 0; } assertThat(tp, greaterThan(80)); @@ -119,12 +144,15 @@ public void testTrendChangePower() { GammaDistribution gamma = new GammaDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1, 2); for (int i = 0; i < 100; i++) { AtomicInteger j = new AtomicInteger(); - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), - DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> j.incrementAndGet() + gamma.sample()).limit(20), + DoubleStream.generate(() -> 2.0 * j.incrementAndGet() + gamma.sample()).limit(20) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.TrendChange ? 1 : 0; } assertThat(tp, greaterThan(80)); } @@ -134,12 +162,15 @@ public void testDistributionChangeTestPower() { NormalDistribution normal2 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 0.0, 10.0); int tp = 0; for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), - DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) - ).toArray(); - ChangeDetector.TestStats test = new ChangeDetector(bucketValues).testForChange(0.05); - tp += test.type() == ChangeDetector.Type.DISTRIBUTION_CHANGE ? 1 : 0; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.generate(() -> 10 + normal1.sample()).limit(50), + DoubleStream.generate(() -> 10 + normal2.sample()).limit(50) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.DistributionChange ? 1 : 0; } assertThat(tp, greaterThan(90)); } @@ -150,60 +181,66 @@ public void testMultipleChanges() { NormalDistribution normal3 = new NormalDistribution(RandomGeneratorFactory.createRandomGenerator(Randomness.get()), 1.0, 0.3); int tp = 0; for (int i = 0; i < 100; i++) { - double[] bucketValues = DoubleStream.concat( - DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), - DoubleStream.generate(normal3::sample).limit(23) - ).toArray(); - ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); - tp += result.type() == ChangeDetector.Type.TREND_CHANGE ? 1 : 0; + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + DoubleStream.concat( + DoubleStream.concat(DoubleStream.generate(normal1::sample).limit(7), DoubleStream.generate(normal2::sample).limit(6)), + DoubleStream.generate(normal3::sample).limit(23) + ).toArray() + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + tp += type instanceof ChangeType.TrendChange ? 1 : 0; } assertThat(tp, greaterThan(90)); } public void testProblemDistributionChange() { - double[] bucketValues = new double[] { - 546.3651753325270, - 550.872738079514, - 551.1312487618040, - 550.3323904749380, - 549.2652495378930, - 548.9761274963630, - 549.3433969743010, - 549.0935313531350, - 551.1762550747600, - 551.3772184469220, - 548.6163495094490, - 548.5866591594080, - 546.9364791288570, - 548.1167839989470, - 549.3484016149320, - 550.4242803917040, - 551.2316023050940, - 548.4713993534340, - 546.0254901960780, - 548.4376996805110, - 561.1920529801320, - 557.3930041152260, - 565.8497217068650, - 566.787072243346, - 546.6094890510950, - 530.5905797101450, - 556.7340823970040, - 557.3857677902620, - 543.0754716981130, - 574.3297101449280, - 559.2962962962960, - 549.5202952029520, - 531.7217741935480, - 551.4333333333330, - 557.637168141593, - 545.1880733944950, - 564.6893203883500, - 543.0204081632650, - 571.820809248555, - 541.2589928057550, - 520.4387755102040 }; - ChangeDetector.TestStats result = new ChangeDetector(bucketValues).testForChange(0.05); - assertThat(result.type(), equalTo(ChangeDetector.Type.DISTRIBUTION_CHANGE)); + MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues( + null, + new double[] { + 546.3651753325270, + 550.872738079514, + 551.1312487618040, + 550.3323904749380, + 549.2652495378930, + 548.9761274963630, + 549.3433969743010, + 549.0935313531350, + 551.1762550747600, + 551.3772184469220, + 548.6163495094490, + 548.5866591594080, + 546.9364791288570, + 548.1167839989470, + 549.3484016149320, + 550.4242803917040, + 551.2316023050940, + 548.4713993534340, + 546.0254901960780, + 548.4376996805110, + 561.1920529801320, + 557.3930041152260, + 565.8497217068650, + 566.787072243346, + 546.6094890510950, + 530.5905797101450, + 556.7340823970040, + 557.3857677902620, + 543.0754716981130, + 574.3297101449280, + 559.2962962962960, + 549.5202952029520, + 531.7217741935480, + 551.4333333333330, + 557.637168141593, + 545.1880733944950, + 564.6893203883500, + 543.0204081632650, + 571.820809248555, + 541.2589928057550, + 520.4387755102040 } + ); + ChangeType type = new ChangeDetector(bucketValues).detect(0.05); + assertThat(type, instanceOf(ChangeType.DistributionChange.class)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java index 99e0c8ed3508b..b21a7c4625e83 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java @@ -25,14 +25,14 @@ public void testTooLittleData() { Arrays.fill(docCounts, 1); Arrays.fill(values, 1.0); MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); - assertThat(detect.detect(0.01, bucketValues), instanceOf(ChangeType.Indeterminable.class)); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); + assertThat(detect.detect(0.01), instanceOf(ChangeType.Indeterminable.class)); } } public void testSpikeAndDipValues() { double[] values = new double[] { 2.0, 1.0, 3.0, 5.0, 4.0 }; - SpikeAndDipDetector detector = new SpikeAndDipDetector(values); + SpikeAndDipDetector detector = new SpikeAndDipDetector(new MlAggsHelper.DoubleBucketValues(null, values)); assertThat(detector.spikeValue(), equalTo(5.0)); assertThat(detector.dipValue(), equalTo(1.0)); } @@ -133,7 +133,7 @@ public void testExludedValues() { Arrays.sort(expectedSpikeKDEValues); Arrays.sort(expectedDipKDEValues); - SpikeAndDipDetector detector = new SpikeAndDipDetector(values); + SpikeAndDipDetector detector = new SpikeAndDipDetector(new MlAggsHelper.DoubleBucketValues(null, values)); assertThat(detector.spikeValue(), equalTo(10.0)); assertThat(detector.dipValue(), equalTo(-2.0)); @@ -150,9 +150,9 @@ public void testDetection() { double[] values = new double[] { 0.1, 3.1, 1.2, 1.7, 0.9, 2.3, -0.8, 3.2, 1.2, 1.3, 1.1, 1.0, 8.5, 0.5, 2.6, 0.7 }; MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType change = detect.detect(0.05, bucketValues); + ChangeType change = detect.detect(0.05); assertThat(change, instanceOf(ChangeType.Spike.class)); assertThat(change.pValue(), closeTo(3.0465e-12, 1e-15)); @@ -162,9 +162,9 @@ public void testDetection() { double[] values = new double[] { 0.1, 3.1, 1.2, 1.7, 0.9, 2.3, -4.2, 3.2, 1.2, 1.3, 1.1, 1.0, 3.5, 0.5, 2.6, 0.7 }; MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType change = detect.detect(0.05, bucketValues); + ChangeType change = detect.detect(0.05); assertThat(change, instanceOf(ChangeType.Dip.class)); assertThat(change.pValue(), closeTo(1.2589e-08, 1e-11)); @@ -177,9 +177,9 @@ public void testMissingBuckets() { int[] buckets = new int[] { 0, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13, 15, 17, 18, 19, 20 }; MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values, buckets); - SpikeAndDipDetector detect = new SpikeAndDipDetector(values); + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType change = detect.detect(0.01, bucketValues); + ChangeType change = detect.detect(0.01); assertThat(change, instanceOf(ChangeType.Spike.class)); assertThat(change.changePoint(), equalTo(10)); From 2c8d0874206936416a10d7659a2c8a7fa8289c67 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Wed, 9 Oct 2024 13:03:01 +0200 Subject: [PATCH 5/6] Move p-value computation to ChangeDetector --- .../ml/aggs/changepoint/ChangeDetector.java | 5 ++- .../changepoint/ChangePointAggregator.java | 32 +++++++------------ 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java index bdb14cdf0fd3c..8dffd7a55e65a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java @@ -44,7 +44,10 @@ public class ChangeDetector { this.values = bucketValues.getValues(); } - ChangeType detect(double pValueThreshold) { + ChangeType detect(double minBucketsPValue) { + // This was obtained by simulating the test power for a fixed size effect as a + // function of the bucket value count. + double pValueThreshold = minBucketsPValue * Math.exp(-0.04 * (values.length - 2 * (ChangePointAggregator.MINIMUM_BUCKETS + 1))); return testForChange(pValueThreshold).changeType(bucketValues, slope(values)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java index be8ec05f4b740..96a78ce0030ff 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java @@ -31,12 +31,6 @@ public class ChangePointAggregator extends SiblingPipelineAggregator { static final double P_VALUE_THRESHOLD = 0.01; static final int MINIMUM_BUCKETS = 10; - private static double changePValueThreshold(int nValues) { - // This was obtained by simulating the test power for a fixed size effect as a - // function of the bucket value count. - return P_VALUE_THRESHOLD * Math.exp(-0.04 * (double) (nValues - 2 * (MINIMUM_BUCKETS + 1))); - } - public ChangePointAggregator(String name, String bucketsPath, Map metadata) { super(name, new String[] { bucketsPath }, metadata); } @@ -82,24 +76,22 @@ static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) { ); } - ChangeType spikeOrDip = testForSpikeOrDip(bucketValues, P_VALUE_THRESHOLD); - ChangeType change = new ChangeDetector(bucketValues).detect(changePValueThreshold(bucketValues.getValues().length)); - logger.trace("change p-value: [{}]", change.pValue()); - if (spikeOrDip.pValue() < change.pValue()) { - change = spikeOrDip; - } - return change; - } - - static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) { + ChangeType spikeOrDip; try { SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - ChangeType result = detect.detect(pValueThreshold); - logger.trace("spike or dip p-value: [{}]", result.pValue()); - return result; + spikeOrDip = detect.detect(P_VALUE_THRESHOLD); + logger.trace("spike or dip p-value: [{}]", spikeOrDip.pValue()); } catch (NotStrictlyPositiveException nspe) { logger.debug("failure testing for dips and spikes", nspe); - return new Indeterminable("failure testing for dips and spikes"); + spikeOrDip = new Indeterminable("failure testing for dips and spikes"); + } + + ChangeType change = new ChangeDetector(bucketValues).detect(P_VALUE_THRESHOLD); + logger.trace("change p-value: [{}]", change.pValue()); + + if (spikeOrDip.pValue() < change.pValue()) { + change = spikeOrDip; } + return change; } } From 07d1dbd6cfee6c77e9de4bb85b2fa97744f11c6b Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Wed, 9 Oct 2024 13:08:37 +0200 Subject: [PATCH 6/6] Move main entrypoint to a separate file --- .../ml/aggs/changepoint/ChangeDetector.java | 4 +- .../changepoint/ChangePointAggregator.java | 41 +------------ .../aggs/changepoint/ChangePointDetector.java | 59 +++++++++++++++++++ 3 files changed, 62 insertions(+), 42 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java index 8dffd7a55e65a..e771fb3b94568 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangeDetector.java @@ -47,7 +47,7 @@ public class ChangeDetector { ChangeType detect(double minBucketsPValue) { // This was obtained by simulating the test power for a fixed size effect as a // function of the bucket value count. - double pValueThreshold = minBucketsPValue * Math.exp(-0.04 * (values.length - 2 * (ChangePointAggregator.MINIMUM_BUCKETS + 1))); + double pValueThreshold = minBucketsPValue * Math.exp(-0.04 * (values.length - 2 * (ChangePointDetector.MINIMUM_BUCKETS + 1))); return testForChange(pValueThreshold).changeType(bucketValues, slope(values)); } @@ -135,7 +135,7 @@ private TestStats testForChange(double pValueThreshold) { } private int[] computeCandidateChangePoints(double[] values) { - int minValues = Math.max((int) (0.1 * values.length + 0.5), ChangePointAggregator.MINIMUM_BUCKETS); + int minValues = Math.max((int) (0.1 * values.length + 0.5), ChangePointDetector.MINIMUM_BUCKETS); if (values.length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS) { return IntStream.range(minValues, values.length - minValues).toArray(); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java index 96a78ce0030ff..d643a937180a1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java @@ -7,16 +7,12 @@ package org.elasticsearch.xpack.ml.aggs.changepoint; -import org.apache.commons.math3.exception.NotStrictlyPositiveException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; -import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType.Indeterminable; import java.util.Map; import java.util.Optional; @@ -26,11 +22,6 @@ public class ChangePointAggregator extends SiblingPipelineAggregator { - private static final Logger logger = LogManager.getLogger(ChangePointAggregator.class); - - static final double P_VALUE_THRESHOLD = 0.01; - static final int MINIMUM_BUCKETS = 10; - public ChangePointAggregator(String name, String bucketsPath, Map metadata) { super(name, new String[] { bucketsPath }, metadata); } @@ -53,7 +44,7 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati } MlAggsHelper.DoubleBucketValues bucketValues = maybeBucketValues.get(); - ChangeType change = getChangeType(bucketValues); + ChangeType change = ChangePointDetector.getChangeType(bucketValues); ChangePointBucket changePointBucket = null; if (change.changePoint() >= 0) { @@ -64,34 +55,4 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati return new InternalChangePointAggregation(name(), metadata(), changePointBucket, change); } - - static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) { - if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) { - return new ChangeType.Indeterminable( - "not enough buckets to calculate change_point. Requires at least [" - + ((2 * MINIMUM_BUCKETS) + 2) - + "]; found [" - + bucketValues.getValues().length - + "]" - ); - } - - ChangeType spikeOrDip; - try { - SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); - spikeOrDip = detect.detect(P_VALUE_THRESHOLD); - logger.trace("spike or dip p-value: [{}]", spikeOrDip.pValue()); - } catch (NotStrictlyPositiveException nspe) { - logger.debug("failure testing for dips and spikes", nspe); - spikeOrDip = new Indeterminable("failure testing for dips and spikes"); - } - - ChangeType change = new ChangeDetector(bucketValues).detect(P_VALUE_THRESHOLD); - logger.trace("change p-value: [{}]", change.pValue()); - - if (spikeOrDip.pValue() < change.pValue()) { - change = spikeOrDip; - } - return change; - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java new file mode 100644 index 0000000000000..d7708420994bb --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointDetector.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.aggs.changepoint; + +import org.apache.commons.math3.exception.NotStrictlyPositiveException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; + +/** + * Detects whether a series of values has a change point, by running both + * ChangeDetector and SpikeAndDipDetector on it. This is the main entrypoint + * of change point detection. + */ +public class ChangePointDetector { + + private static final Logger logger = LogManager.getLogger(ChangePointDetector.class); + + static final double P_VALUE_THRESHOLD = 0.01; + static final int MINIMUM_BUCKETS = 10; + + /** + * Returns the ChangeType of a series of values. + */ + public static ChangeType getChangeType(MlAggsHelper.DoubleBucketValues bucketValues) { + if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) { + return new ChangeType.Indeterminable( + "not enough buckets to calculate change_point. Requires at least [" + + ((2 * MINIMUM_BUCKETS) + 2) + + "]; found [" + + bucketValues.getValues().length + + "]" + ); + } + + ChangeType spikeOrDip; + try { + SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues); + spikeOrDip = detect.detect(P_VALUE_THRESHOLD); + logger.trace("spike or dip p-value: [{}]", spikeOrDip.pValue()); + } catch (NotStrictlyPositiveException nspe) { + logger.debug("failure testing for dips and spikes", nspe); + spikeOrDip = new ChangeType.Indeterminable("failure testing for dips and spikes"); + } + + ChangeType change = new ChangeDetector(bucketValues).detect(P_VALUE_THRESHOLD); + logger.trace("change p-value: [{}]", change.pValue()); + + if (spikeOrDip.pValue() < change.pValue()) { + change = spikeOrDip; + } + return change; + } +}