77
88package org .elasticsearch .compute .aggregation ;
99
10- import org .apache .lucene .util .BytesRef ;
11- import org .elasticsearch .common .Strings ;
12- import org .elasticsearch .common .util .BigArrays ;
13- import org .elasticsearch .common .util .LongArray ;
10+ import org .elasticsearch .compute .aggregation .ChangePointStates .GroupingState ;
11+ import org .elasticsearch .compute .aggregation .ChangePointStates .SingleState ;
1412import org .elasticsearch .compute .ann .Aggregator ;
1513import org .elasticsearch .compute .ann .GroupingAggregator ;
1614import org .elasticsearch .compute .ann .IntermediateState ;
1715import org .elasticsearch .compute .data .Block ;
18- import org .elasticsearch .compute .data .BlockFactory ;
19- import org .elasticsearch .compute .data .BytesRefBlock ;
16+ import org .elasticsearch .compute .data .DoubleBlock ;
2017import org .elasticsearch .compute .data .IntVector ;
2118import org .elasticsearch .compute .data .LongBlock ;
2219import org .elasticsearch .compute .operator .DriverContext ;
23- import org .elasticsearch .core .Releasable ;
24- import org .elasticsearch .xcontent .ToXContent ;
25- import org .elasticsearch .xcontent .XContentBuilder ;
26- import org .elasticsearch .xcontent .XContentFactory ;
27- import org .elasticsearch .xpack .core .ml .utils .NamedXContentObjectHelper ;
28- import org .elasticsearch .xpack .ml .aggs .MlAggsHelper ;
29- import org .elasticsearch .xpack .ml .aggs .changepoint .ChangePointDetector ;
30- import org .elasticsearch .xpack .ml .aggs .changepoint .ChangeType ;
31-
32- import java .io .IOException ;
33- import java .util .ArrayList ;
34- import java .util .Collections ;
35- import java .util .HashMap ;
36- import java .util .List ;
37- import java .util .Map ;
38- import java .util .function .Function ;
3920
4021/**
4122 * Change point detection for series of long values.
4223 */
4324// TODO: make .java.st from this to support different types
4425@ Aggregator (
4526 includeTimestamps = true ,
46- value = { @ IntermediateState (name = "timestamps" , type = "LONG_BLOCK" ), @ IntermediateState (name = "values" , type = "LONG_BLOCK " ) }
27+ value = { @ IntermediateState (name = "timestamps" , type = "LONG_BLOCK" ), @ IntermediateState (name = "values" , type = "DOUBLE_BLOCK " ) }
4728)
4829@ GroupingAggregator (
4930 includeTimestamps = true ,
50- value = { @ IntermediateState (name = "timestamps" , type = "LONG_BLOCK" ), @ IntermediateState (name = "values" , type = "LONG_BLOCK " ) }
31+ value = { @ IntermediateState (name = "timestamps" , type = "LONG_BLOCK" ), @ IntermediateState (name = "values" , type = "DOUBLE_BLOCK " ) }
5132)
5233class ChangePointLongAggregator {
5334
@@ -59,11 +40,11 @@ public static void combine(SingleState state, long timestamp, long value) {
5940 state .add (timestamp , value );
6041 }
6142
62- public static void combineIntermediate (SingleState state , LongBlock timestamps , LongBlock values ) {
43+ public static void combineIntermediate (SingleState state , LongBlock timestamps , DoubleBlock values ) {
6344 int start = values .getFirstValueIndex (0 );
6445 int end = start + values .getValueCount (0 );
6546 for (int i = start ; i < end ; i ++) {
66- combine ( state , timestamps .getLong (i ), values .getLong (i ));
47+ state . add ( timestamps .getLong (i ), values .getDouble (i ));
6748 }
6849 }
6950
@@ -79,7 +60,13 @@ public static void combine(GroupingState current, int groupId, long timestamp, l
7960 current .add (groupId , timestamp , value );
8061 }
8162
82- public static void combineIntermediate (GroupingState current , int groupId , LongBlock timestamps , LongBlock values , int otherPosition ) {
63+ public static void combineIntermediate (
64+ GroupingState current ,
65+ int groupId ,
66+ LongBlock timestamps ,
67+ DoubleBlock values ,
68+ int otherPosition
69+ ) {
8370 current .combine (groupId , timestamps , values , otherPosition );
8471 }
8572
@@ -90,190 +77,4 @@ public static void combineStates(GroupingState current, int currentGroupId, Grou
9077 public static Block evaluateFinal (GroupingState state , IntVector selected , DriverContext driverContext ) {
9178 return state .evaluateFinal (selected , driverContext .blockFactory ());
9279 }
93-
94- public static class SingleState implements Releasable {
95- private final BigArrays bigArrays ;
96- private int count ;
97- private LongArray timestamps ;
98- private LongArray values ;
99-
100- private SingleState (BigArrays bigArrays ) {
101- this .bigArrays = bigArrays ;
102- count = 0 ;
103- timestamps = bigArrays .newLongArray (0 );
104- values = bigArrays .newLongArray (0 );
105- }
106-
107- void add (long timestamp , long value ) {
108- count ++;
109- timestamps = bigArrays .grow (timestamps , count );
110- timestamps .set (count - 1 , timestamp );
111- values = bigArrays .grow (values , count );
112- values .set (count - 1 , value );
113- }
114-
115- void toIntermediate (Block [] blocks , int offset , DriverContext driverContext ) {
116- blocks [offset ] = toBlock (timestamps , driverContext .blockFactory ());
117- blocks [offset + 1 ] = toBlock (values , driverContext .blockFactory ());
118- }
119-
120- Block toBlock (LongArray arr , BlockFactory blockFactory ) {
121- if (arr .size () == 0 ) {
122- return blockFactory .newConstantNullBlock (1 );
123- }
124- if (values .size () == 1 ) {
125- return blockFactory .newConstantLongBlockWith (arr .get (0 ), 1 );
126- }
127- try (LongBlock .Builder builder = blockFactory .newLongBlockBuilder ((int ) arr .size ())) {
128- builder .beginPositionEntry ();
129- for (int id = 0 ; id < count ; id ++) {
130- builder .appendLong (arr .get (id ));
131- }
132- builder .endPositionEntry ();
133- return builder .build ();
134- }
135- }
136-
137- BytesRef toBytesRef () {
138- // TODO: this copying doesn't account for memory
139- List <TimeAndValue > list = new ArrayList <>(count );
140- for (int i = 0 ; i < count ; i ++) {
141- list .add (new TimeAndValue (timestamps .get (i ), values .get (i )));
142- }
143- Collections .sort (list );
144- double [] values = new double [count ];
145- for (int i = 0 ; i < count ; i ++) {
146- values [i ] = list .get (i ).value ;
147- }
148- MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (null , values );
149- ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
150- try (XContentBuilder xContentBuilder = XContentFactory .jsonBuilder ()) {
151- xContentBuilder .startObject ();
152- NamedXContentObjectHelper .writeNamedObject (xContentBuilder , ToXContent .EMPTY_PARAMS , "type" , changeType );
153- xContentBuilder .endObject ();
154- String xContent = Strings .toString (xContentBuilder );
155- return new BytesRef (xContent );
156- } catch (IOException e ) {
157- throw new RuntimeException (e );
158- }
159- }
160-
161- public void add (LongBlock timestamps , LongBlock values , int otherPosition ) {
162- final int valueCount = timestamps .getValueCount (otherPosition );
163- final int firstIndex = timestamps .getFirstValueIndex (otherPosition );
164- for (int i = 0 ; i < valueCount ; i ++) {
165- add (timestamps .getLong (firstIndex + i ), values .getLong (firstIndex + i ));
166- }
167- }
168-
169- public Block toBlock (BlockFactory blockFactory ) {
170- // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
171- return blockFactory .newConstantBytesRefBlockWith (toBytesRef (), 1 );
172- }
173-
174- record TimeAndValue (long timestamp , long value ) implements Comparable <TimeAndValue > {
175- @ Override
176- public int compareTo (TimeAndValue other ) {
177- return Long .compare (timestamp , other .timestamp );
178- }
179- }
180-
181- @ Override
182- public void close () {
183- timestamps .close ();
184- values .close ();
185- }
186- }
187-
188- public static class GroupingState implements Releasable {
189- private final BigArrays bigArrays ;
190- private final Map <Integer , SingleState > states ;
191-
192- private GroupingState (BigArrays bigArrays ) {
193- this .bigArrays = bigArrays ;
194- states = new HashMap <>();
195- }
196-
197- void add (int groupId , long timestamp , long value ) {
198- SingleState state = states .computeIfAbsent (groupId , key -> new SingleState (bigArrays ));
199- state .add (timestamp , value );
200- }
201-
202- void combine (int groupId , LongBlock timestamps , LongBlock values , int otherPosition ) {
203- if (timestamps .getValueCount (otherPosition ) == 0 ) {
204- return ;
205- }
206- SingleState state = states .computeIfAbsent (groupId , key -> new SingleState (bigArrays ));
207- state .add (timestamps , values , otherPosition );
208- }
209-
210- void combineState (int groupId , GroupingState otherState , int otherGroupId ) {
211- SingleState other = otherState .states .get (otherGroupId );
212- if (other == null ) {
213- return ;
214- }
215- var state = states .computeIfAbsent (groupId , key -> new SingleState (bigArrays ));
216- for (int i = 0 ; i < other .timestamps .size (); i ++) {
217- state .add (state .timestamps .get (i ), state .values .get (i ));
218- }
219- }
220-
221- void toIntermediate (Block [] blocks , int offset , IntVector selected , DriverContext driverContext ) {
222- blocks [offset ] = toBlock (s -> s .timestamps , driverContext .blockFactory (), selected );
223- blocks [offset + 1 ] = toBlock (s -> s .values , driverContext .blockFactory (), selected );
224- }
225-
226- public Block evaluateFinal (IntVector selected , BlockFactory blockFactory ) {
227- // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
228- try (BytesRefBlock .Builder builder = blockFactory .newBytesRefBlockBuilder (selected .getPositionCount ())) {
229- for (int s = 0 ; s < selected .getPositionCount (); s ++) {
230- builder .appendBytesRef (states .get (selected .getInt (s )).toBytesRef ());
231- }
232- return builder .build ();
233- }
234- }
235-
236- Block toBlock (Function <SingleState , LongArray > getArray , BlockFactory blockFactory , IntVector selected ) {
237- if (states .isEmpty ()) {
238- return blockFactory .newConstantNullBlock (selected .getPositionCount ());
239- }
240- try (LongBlock .Builder builder = blockFactory .newLongBlockBuilder (selected .getPositionCount ())) {
241- for (int s = 0 ; s < selected .getPositionCount (); s ++) {
242- int selectedGroup = selected .getInt (s );
243- SingleState state = states .get (selectedGroup );
244- LongArray values = getArray .apply (state );
245- int count = 0 ;
246- long first = 0 ;
247- for (int i = 0 ; i < state .count ; i ++) {
248- long value = values .get (i );
249- switch (count ) {
250- case 0 -> first = value ;
251- case 1 -> {
252- builder .beginPositionEntry ();
253- builder .appendLong (first );
254- builder .appendLong (value );
255- }
256- default -> builder .appendLong (value );
257- }
258- count ++;
259- }
260- switch (count ) {
261- case 0 -> builder .appendNull ();
262- case 1 -> builder .appendLong (first );
263- default -> builder .endPositionEntry ();
264- }
265- }
266- return builder .build ();
267- }
268- }
269-
270- void enableGroupIdTracking (SeenGroupIds seenGroupIds ) {}
271-
272- @ Override
273- public void close () {
274- for (SingleState state : states .values ()) {
275- state .close ();
276- }
277- }
278- }
27980}
0 commit comments