@@ -58,30 +58,8 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati
5858 );
5959 }
6060 MlAggsHelper .DoubleBucketValues bucketValues = maybeBucketValues .get ();
61- if (bucketValues .getValues ().length < (2 * MINIMUM_BUCKETS ) + 2 ) {
62- return new InternalChangePointAggregation (
63- name (),
64- metadata (),
65- null ,
66- new ChangeType .Indeterminable (
67- "not enough buckets to calculate change_point. Requires at least ["
68- + ((2 * MINIMUM_BUCKETS ) + 2 )
69- + "]; found ["
70- + bucketValues .getValues ().length
71- + "]"
72- )
73- );
74- }
7561
76- ChangeType spikeOrDip = testForSpikeOrDip (bucketValues , P_VALUE_THRESHOLD );
77-
78- // Test for change step, trend and distribution changes.
79- ChangeType change = testForChange (bucketValues , changePValueThreshold (bucketValues .getValues ().length ));
80- logger .trace ("change p-value: [{}]" , change .pValue ());
81-
82- if (spikeOrDip .pValue () < change .pValue ()) {
83- change = spikeOrDip ;
84- }
62+ ChangeType change = getChangeType (bucketValues );
8563
8664 ChangePointBucket changePointBucket = null ;
8765 if (change .changePoint () >= 0 ) {
@@ -93,20 +71,35 @@ public InternalAggregation doReduce(InternalAggregations aggregations, Aggregati
9371 return new InternalChangePointAggregation (name (), metadata (), changePointBucket , change );
9472 }
9573
74+ static ChangeType getChangeType (MlAggsHelper .DoubleBucketValues bucketValues ) {
75+ if (bucketValues .getValues ().length < (2 * MINIMUM_BUCKETS ) + 2 ) {
76+ return new ChangeType .Indeterminable (
77+ "not enough buckets to calculate change_point. Requires at least ["
78+ + ((2 * MINIMUM_BUCKETS ) + 2 )
79+ + "]; found ["
80+ + bucketValues .getValues ().length
81+ + "]"
82+ );
83+ }
84+
85+ ChangeType spikeOrDip = testForSpikeOrDip (bucketValues , P_VALUE_THRESHOLD );
86+ ChangeType change = new ChangeDetector (bucketValues ).detect (changePValueThreshold (bucketValues .getValues ().length ));
87+ logger .trace ("change p-value: [{}]" , change .pValue ());
88+ if (spikeOrDip .pValue () < change .pValue ()) {
89+ change = spikeOrDip ;
90+ }
91+ return change ;
92+ }
93+
9694 static ChangeType testForSpikeOrDip (MlAggsHelper .DoubleBucketValues bucketValues , double pValueThreshold ) {
9795 try {
98- SpikeAndDipDetector detect = new SpikeAndDipDetector (bucketValues . getValues () );
99- ChangeType result = detect .detect (pValueThreshold , bucketValues );
96+ SpikeAndDipDetector detect = new SpikeAndDipDetector (bucketValues );
97+ ChangeType result = detect .detect (pValueThreshold );
10098 logger .trace ("spike or dip p-value: [{}]" , result .pValue ());
10199 return result ;
102100 } catch (NotStrictlyPositiveException nspe ) {
103101 logger .debug ("failure testing for dips and spikes" , nspe );
102+ return new Indeterminable ("failure testing for dips and spikes" );
104103 }
105- return new Indeterminable ("failure testing for dips and spikes" );
106- }
107-
108- static ChangeType testForChange (MlAggsHelper .DoubleBucketValues bucketValues , double pValueThreshold ) {
109- double [] timeWindow = bucketValues .getValues ();
110- return new ChangeDetector (timeWindow ).detect (pValueThreshold , bucketValues );
111104 }
112105}
0 commit comments