2323import java .util .function .IntToDoubleFunction ;
2424import java .util .stream .IntStream ;
2525
26+ /**
27+ * Detects whether a time series is stationary or changing
28+ * (either continuously or at a specific change point).
29+ */
2630public class ChangeDetector {
2731
2832 private static final int MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST = 500 ;
33+ private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000 ;
34+
2935 private static final KolmogorovSmirnovTest KOLMOGOROV_SMIRNOV_TEST = new KolmogorovSmirnovTest ();
3036
3137 private static final Logger logger = LogManager .getLogger (ChangeDetector .class );
3238
33- static TestStats testForChange (double [] timeWindow , double pValueThreshold ) {
39+ private final double [] values ;
40+
41+ ChangeDetector (double [] values ) {
42+ this .values = values ;
43+ }
44+
45+ public ChangeType detect (double pValueThreshold , MlAggsHelper .DoubleBucketValues bucketValues ) {
46+ return testForChange (pValueThreshold ).changeType (bucketValues , slope (values ));
47+ }
48+
49+ // visible for testing
50+ TestStats testForChange (double pValueThreshold ) {
3451
35- int [] candidateChangePoints = ChangePointAggregator . computeCandidateChangePoints (timeWindow );
52+ int [] candidateChangePoints = computeCandidateChangePoints (values );
3653 logger .trace ("candidatePoints: [{}]" , Arrays .toString (candidateChangePoints ));
3754
38- double [] timeWindowWeights = outlierWeights (timeWindow );
39- logger .trace ("timeWindow : [{}]" , Arrays .toString (timeWindow ));
40- logger .trace ("timeWindowWeights : [{}]" , Arrays .toString (timeWindowWeights ));
41- RunningStats dataRunningStats = RunningStats .from (timeWindow , i -> timeWindowWeights [i ]);
55+ double [] valuesWeights = outlierWeights (values );
56+ logger .trace ("values : [{}]" , Arrays .toString (values ));
57+ logger .trace ("valuesWeights : [{}]" , Arrays .toString (valuesWeights ));
58+ RunningStats dataRunningStats = RunningStats .from (values , i -> valuesWeights [i ]);
4259 DataStats dataStats = new DataStats (
4360 dataRunningStats .count (),
4461 dataRunningStats .mean (),
@@ -52,38 +69,33 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
5269 return stationary ;
5370 }
5471
55- TestStats trendVsStationary = testTrendVs (stationary , timeWindow , timeWindowWeights );
72+ TestStats trendVsStationary = testTrendVs (stationary , values , valuesWeights );
5673 logger .trace ("trend vs stationary: [{}]" , trendVsStationary );
5774
5875 TestStats best = stationary ;
5976 Set <Integer > discoveredChangePoints = Sets .newHashSetWithExpectedSize (4 );
6077 if (trendVsStationary .accept (pValueThreshold )) {
6178 // Check if there is a change in the trend.
62- TestStats trendChangeVsTrend = testTrendChangeVs (trendVsStationary , timeWindow , timeWindowWeights , candidateChangePoints );
79+ TestStats trendChangeVsTrend = testTrendChangeVs (trendVsStationary , values , valuesWeights , candidateChangePoints );
6380 discoveredChangePoints .add (trendChangeVsTrend .changePoint ());
6481 logger .trace ("trend change vs trend: [{}]" , trendChangeVsTrend );
6582
6683 if (trendChangeVsTrend .accept (pValueThreshold )) {
6784 // Check if modeling a trend change adds much over modeling a step change.
68- best = testVsStepChange (trendChangeVsTrend , timeWindow , timeWindowWeights , candidateChangePoints , pValueThreshold );
85+ best = testVsStepChange (trendChangeVsTrend , values , valuesWeights , candidateChangePoints , pValueThreshold );
6986 } else {
7087 best = trendVsStationary ;
7188 }
7289
7390 } else {
7491 // Check if there is a step change.
75- TestStats stepChangeVsStationary = testStepChangeVs (stationary , timeWindow , timeWindowWeights , candidateChangePoints );
92+ TestStats stepChangeVsStationary = testStepChangeVs (stationary , values , valuesWeights , candidateChangePoints );
7693 discoveredChangePoints .add (stepChangeVsStationary .changePoint ());
7794 logger .trace ("step change vs stationary: [{}]" , stepChangeVsStationary );
7895
7996 if (stepChangeVsStationary .accept (pValueThreshold )) {
8097 // Check if modeling a trend change adds much over modeling a step change.
81- TestStats trendChangeVsStepChange = testTrendChangeVs (
82- stepChangeVsStationary ,
83- timeWindow ,
84- timeWindowWeights ,
85- candidateChangePoints
86- );
98+ TestStats trendChangeVsStepChange = testTrendChangeVs (stepChangeVsStationary , values , valuesWeights , candidateChangePoints );
8799 discoveredChangePoints .add (stepChangeVsStationary .changePoint ());
88100 logger .trace ("trend change vs step change: [{}]" , trendChangeVsStepChange );
89101 if (trendChangeVsStepChange .accept (pValueThreshold )) {
@@ -94,7 +106,7 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
94106
95107 } else {
96108 // Check if there is a trend change.
97- TestStats trendChangeVsStationary = testTrendChangeVs (stationary , timeWindow , timeWindowWeights , candidateChangePoints );
109+ TestStats trendChangeVsStationary = testTrendChangeVs (stationary , values , valuesWeights , candidateChangePoints );
98110 discoveredChangePoints .add (stepChangeVsStationary .changePoint ());
99111 logger .trace ("trend change vs stationary: [{}]" , trendChangeVsStationary );
100112 if (trendChangeVsStationary .accept (pValueThreshold )) {
@@ -108,13 +120,7 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
108120 // We're not very confident in the change point, so check if a distribution change
109121 // fits the data better.
110122 if (best .pValueVsStationary () > 1e-5 ) {
111- TestStats distChange = testDistributionChange (
112- dataStats ,
113- timeWindow ,
114- timeWindowWeights ,
115- candidateChangePoints ,
116- discoveredChangePoints
117- );
123+ TestStats distChange = testDistributionChange (dataStats , values , valuesWeights , candidateChangePoints , discoveredChangePoints );
118124 logger .trace ("distribution change: [{}]" , distChange );
119125 if (distChange .pValue () < Math .min (pValueThreshold , 0.1 * best .pValueVsStationary ())) {
120126 best = distChange ;
@@ -124,7 +130,17 @@ static TestStats testForChange(double[] timeWindow, double pValueThreshold) {
124130 return best ;
125131 }
126132
127- static double [] outlierWeights (double [] values ) {
133+ private int [] computeCandidateChangePoints (double [] values ) {
134+ int minValues = Math .max ((int ) (0.1 * values .length + 0.5 ), ChangePointAggregator .MINIMUM_BUCKETS );
135+ if (values .length - 2 * minValues <= MAXIMUM_CANDIDATE_CHANGE_POINTS ) {
136+ return IntStream .range (minValues , values .length - minValues ).toArray ();
137+ } else {
138+ int step = (int ) Math .ceil ((double ) (values .length - 2 * minValues ) / MAXIMUM_CANDIDATE_CHANGE_POINTS );
139+ return IntStream .range (minValues , values .length - minValues ).filter (i -> i % step == 0 ).toArray ();
140+ }
141+ }
142+
143+ private double [] outlierWeights (double [] values ) {
128144 int i = (int ) Math .ceil (0.025 * values .length );
129145 double [] weights = Arrays .copyOf (values , values .length );
130146 Arrays .sort (weights );
@@ -144,19 +160,19 @@ static double[] outlierWeights(double[] values) {
144160 return weights ;
145161 }
146162
147- static double slope (double [] values ) {
163+ private double slope (double [] values ) {
148164 SimpleRegression regression = new SimpleRegression ();
149165 for (int i = 0 ; i < values .length ; i ++) {
150166 regression .addData (i , values [i ]);
151167 }
152168 return regression .getSlope ();
153169 }
154170
155- static double independentTrialsPValue (double pValue , int nTrials ) {
171+ private static double independentTrialsPValue (double pValue , int nTrials ) {
156172 return pValue > 1e-10 ? 1.0 - Math .pow (1.0 - pValue , nTrials ) : nTrials * pValue ;
157173 }
158174
159- static TestStats testTrendVs (TestStats H0 , double [] values , double [] weights ) {
175+ private TestStats testTrendVs (TestStats H0 , double [] values , double [] weights ) {
160176 LeastSquaresOnlineRegression allLeastSquares = new LeastSquaresOnlineRegression (2 );
161177 for (int i = 0 ; i < values .length ; i ++) {
162178 allLeastSquares .add (i , values [i ], weights [i ]);
@@ -166,7 +182,7 @@ static TestStats testTrendVs(TestStats H0, double[] values, double[] weights) {
166182 return new TestStats (Type .NON_STATIONARY , pValue , vTrend , 3.0 , H0 .dataStats ());
167183 }
168184
169- static TestStats testStepChangeVs (TestStats H0 , double [] values , double [] weights , int [] candidateChangePoints ) {
185+ private TestStats testStepChangeVs (TestStats H0 , double [] values , double [] weights , int [] candidateChangePoints ) {
170186
171187 double vStep = Double .MAX_VALUE ;
172188 int changePoint = -1 ;
@@ -203,7 +219,7 @@ static TestStats testStepChangeVs(TestStats H0, double[] values, double[] weight
203219 return new TestStats (Type .STEP_CHANGE , pValue , vStep , 2.0 , changePoint , H0 .dataStats ());
204220 }
205221
206- static TestStats testTrendChangeVs (TestStats H0 , double [] values , double [] weights , int [] candidateChangePoints ) {
222+ private TestStats testTrendChangeVs (TestStats H0 , double [] values , double [] weights , int [] candidateChangePoints ) {
207223
208224 double vChange = Double .MAX_VALUE ;
209225 int changePoint = -1 ;
@@ -252,7 +268,7 @@ static TestStats testTrendChangeVs(TestStats H0, double[] values, double[] weigh
252268 return new TestStats (Type .TREND_CHANGE , pValue , vChange , 6.0 , changePoint , H0 .dataStats ());
253269 }
254270
255- static TestStats testVsStepChange (
271+ private TestStats testVsStepChange (
256272 TestStats trendChange ,
257273 double [] values ,
258274 double [] weights ,
@@ -267,7 +283,7 @@ static TestStats testVsStepChange(
267283 return pValue < pValueThreshold ? trendChange : stepChange ;
268284 }
269285
270- static double fTestNestedPValue (double n , double vNull , double pNull , double vAlt , double pAlt ) {
286+ private static double fTestNestedPValue (double n , double vNull , double pNull , double vAlt , double pAlt ) {
271287 if (vAlt == vNull ) {
272288 return 1.0 ;
273289 }
@@ -287,7 +303,7 @@ private static int lowerBound(int[] x, int start, int end, int xs) {
287303 return retVal ;
288304 }
289305
290- static SampleData sample (double [] values , double [] weights , Set <Integer > changePoints ) {
306+ private SampleData sample (double [] values , double [] weights , Set <Integer > changePoints ) {
291307 Integer [] adjChangePoints = changePoints .toArray (new Integer [changePoints .size ()]);
292308 if (values .length <= MAXIMUM_SAMPLE_SIZE_FOR_KS_TEST ) {
293309 return new SampleData (values , weights , adjChangePoints );
@@ -320,7 +336,7 @@ static SampleData sample(double[] values, double[] weights, Set<Integer> changeP
320336 return new SampleData (sample , sampleWeights , adjChangePoints );
321337 }
322338
323- static TestStats testDistributionChange (
339+ private TestStats testDistributionChange (
324340 DataStats stats ,
325341 double [] values ,
326342 double [] weights ,
@@ -384,7 +400,7 @@ static TestStats testDistributionChange(
384400 return new TestStats (Type .DISTRIBUTION_CHANGE , pValue , changePoint , stats );
385401 }
386402
387- static double fDistribSf (double numeratorDegreesOfFreedom , double denominatorDegreesOfFreedom , double x ) {
403+ private static double fDistribSf (double numeratorDegreesOfFreedom , double denominatorDegreesOfFreedom , double x ) {
388404 if (x <= 0 ) {
389405 return 1 ;
390406 }
@@ -399,6 +415,7 @@ static double fDistribSf(double numeratorDegreesOfFreedom, double denominatorDeg
399415 );
400416 }
401417
418+ // visible for testing
402419 enum Type {
403420 STATIONARY ,
404421 NON_STATIONARY ,
@@ -425,6 +442,7 @@ public String toString() {
425442 }
426443 }
427444
445+ // visible for testing
428446 record TestStats (Type type , double pValue , double var , double nParams , int changePoint , DataStats dataStats ) {
429447 TestStats (Type type , double pValue , int changePoint , DataStats dataStats ) {
430448 this (type , pValue , 0.0 , 0.0 , changePoint , dataStats );
@@ -482,7 +500,7 @@ public String toString() {
482500 }
483501 }
484502
485- static class RunningStats {
503+ private static class RunningStats {
486504 double sumOfSqrs ;
487505 double sum ;
488506 double count ;
0 commit comments