1111import org .elasticsearch .common .Strings ;
1212import org .elasticsearch .common .util .BigArrays ;
1313import org .elasticsearch .common .util .LongArray ;
14+ import org .elasticsearch .compute .ann .Aggregator ;
1415import org .elasticsearch .compute .ann .GroupingAggregator ;
1516import org .elasticsearch .compute .ann .IntermediateState ;
1617import org .elasticsearch .compute .data .Block ;
3940/**
4041 * Change point detection for series of long values.
4142 */
42-
4343// TODO: make .java.st from this to support different types
44-
45- // TODO: add non-grouping @Aggregator, like this:
46- /*
4744@ Aggregator (
4845 includeTimestamps = true ,
4946 value = { @ IntermediateState (name = "timestamps" , type = "LONG_BLOCK" ), @ IntermediateState (name = "values" , type = "LONG_BLOCK" ) }
5047)
51- */
52- // This need "includeTimestamps" support in @Aggregator.
53-
5448@ GroupingAggregator (
5549 includeTimestamps = true ,
5650 value = { @ IntermediateState (name = "timestamps" , type = "LONG_BLOCK" ), @ IntermediateState (name = "values" , type = "LONG_BLOCK" ) }
5751)
5852class ChangePointLongAggregator {
5953
54+ public static SingleState initSingle (DriverContext driverContext ) {
55+ return new SingleState (driverContext .bigArrays ());
56+ }
57+
58+ public static void combine (SingleState state , long timestamp , long value ) {
59+ state .add (timestamp , value );
60+ }
61+
62+ public static void combineIntermediate (SingleState state , LongBlock timestamps , LongBlock values ) {
63+ int start = values .getFirstValueIndex (0 );
64+ int end = start + values .getValueCount (0 );
65+ for (int i = start ; i < end ; i ++) {
66+ combine (state , timestamps .getLong (i ), values .getLong (i ));
67+ }
68+ }
69+
70+ public static Block evaluateFinal (SingleState state , DriverContext driverContext ) {
71+ return state .toBlock (driverContext .blockFactory ());
72+ }
73+
6074 public static GroupingState initGrouping (DriverContext driverContext ) {
6175 return new GroupingState (driverContext .bigArrays ());
6276 }
@@ -112,31 +126,55 @@ Block toBlock(LongArray arr, BlockFactory blockFactory) {
112126 }
113127 try (LongBlock .Builder builder = blockFactory .newLongBlockBuilder ((int ) arr .size ())) {
114128 builder .beginPositionEntry ();
115- for (int id = 0 ; id < arr . size () ; id ++) {
129+ for (int id = 0 ; id < count ; id ++) {
116130 builder .appendLong (arr .get (id ));
117131 }
118132 builder .endPositionEntry ();
119133 return builder .build ();
120134 }
121135 }
122136
123- record TimeAndValue (long timestamp , long value ) implements Comparable <TimeAndValue > {
124- @ Override
125- public int compareTo (TimeAndValue other ) {
126- return Long .compare (timestamp , other .timestamp );
127- }
128- }
129-
130- void sort () {
131- // TODO: this copying is a bit inefficient and doesn't account for memory
137+ BytesRef toBytesRef () {
138+ // TODO: this copying doesn't account for memory
132139 List <TimeAndValue > list = new ArrayList <>(count );
133140 for (int i = 0 ; i < count ; i ++) {
134141 list .add (new TimeAndValue (timestamps .get (i ), values .get (i )));
135142 }
136143 Collections .sort (list );
144+ double [] values = new double [count ];
137145 for (int i = 0 ; i < count ; i ++) {
138- timestamps .set (i , list .get (i ).timestamp );
139- values .set (i , list .get (i ).value );
146+ values [i ] = list .get (i ).value ;
147+ }
148+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (null , values );
149+ ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
150+ try (XContentBuilder xContentBuilder = XContentFactory .jsonBuilder ()) {
151+ xContentBuilder .startObject ();
152+ NamedXContentObjectHelper .writeNamedObject (xContentBuilder , ToXContent .EMPTY_PARAMS , "type" , changeType );
153+ xContentBuilder .endObject ();
154+ String xContent = Strings .toString (xContentBuilder );
155+ return new BytesRef (xContent );
156+ } catch (IOException e ) {
157+ throw new RuntimeException (e );
158+ }
159+ }
160+
161+ public void add (LongBlock timestamps , LongBlock values , int otherPosition ) {
162+ final int valueCount = timestamps .getValueCount (otherPosition );
163+ final int firstIndex = timestamps .getFirstValueIndex (otherPosition );
164+ for (int i = 0 ; i < valueCount ; i ++) {
165+ add (timestamps .getLong (firstIndex + i ), values .getLong (firstIndex + i ));
166+ }
167+ }
168+
169+ public Block toBlock (BlockFactory blockFactory ) {
170+ // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
171+ return blockFactory .newConstantBytesRefBlockWith (toBytesRef (), 1 );
172+ }
173+
174+ record TimeAndValue (long timestamp , long value ) implements Comparable <TimeAndValue > {
175+ @ Override
176+ public int compareTo (TimeAndValue other ) {
177+ return Long .compare (timestamp , other .timestamp );
140178 }
141179 }
142180
@@ -162,15 +200,11 @@ void add(int groupId, long timestamp, long value) {
162200 }
163201
164202 void combine (int groupId , LongBlock timestamps , LongBlock values , int otherPosition ) {
165- final int valueCount = timestamps .getValueCount (otherPosition );
166- if (valueCount == 0 ) {
203+ if (timestamps .getValueCount (otherPosition ) == 0 ) {
167204 return ;
168205 }
169- final int firstIndex = timestamps .getFirstValueIndex (otherPosition );
170206 SingleState state = states .computeIfAbsent (groupId , key -> new SingleState (bigArrays ));
171- for (int i = 0 ; i < valueCount ; i ++) {
172- state .add (timestamps .getLong (firstIndex + i ), values .getLong (firstIndex + i ));
173- }
207+ state .add (timestamps , values , otherPosition );
174208 }
175209
176210 void combineState (int groupId , GroupingState otherState , int otherGroupId ) {
@@ -193,23 +227,7 @@ public Block evaluateFinal(IntVector selected, BlockFactory blockFactory) {
193227 // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
194228 try (BytesRefBlock .Builder builder = blockFactory .newBytesRefBlockBuilder (selected .getPositionCount ())) {
195229 for (int s = 0 ; s < selected .getPositionCount (); s ++) {
196- SingleState state = states .get (selected .getInt (s ));
197- state .sort ();
198- double [] values = new double [state .count ];
199- for (int i = 0 ; i < state .count ; i ++) {
200- values [i ] = state .values .get (i );
201- }
202- MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (null , values );
203- ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
204- try (XContentBuilder xContentBuilder = XContentFactory .jsonBuilder ()) {
205- xContentBuilder .startObject ();
206- NamedXContentObjectHelper .writeNamedObject (xContentBuilder , ToXContent .EMPTY_PARAMS , "type" , changeType );
207- xContentBuilder .endObject ();
208- String xContent = Strings .toString (xContentBuilder );
209- builder .appendBytesRef (new BytesRef (xContent ));
210- } catch (IOException e ) {
211- throw new RuntimeException (e );
212- }
230+ builder .appendBytesRef (states .get (selected .getInt (s )).toBytesRef ());
213231 }
214232 return builder .build ();
215233 }
0 commit comments