2020import org .elasticsearch .compute .data .LongBlock ;
2121import org .elasticsearch .compute .operator .DriverContext ;
2222import org .elasticsearch .core .Releasable ;
23+ import org .elasticsearch .core .Releasables ;
2324import org .elasticsearch .xcontent .ToXContent ;
2425import org .elasticsearch .xcontent .XContentBuilder ;
2526import org .elasticsearch .xcontent .XContentFactory ;
3738
3839public class ChangePointStates {
3940
40- public static class SingleState implements Releasable {
41+ record TimeAndValue (long timestamp , double value ) implements Comparable <TimeAndValue > {
42+ @ Override
43+ public int compareTo (TimeAndValue other ) {
44+ return Long .compare (timestamp , other .timestamp );
45+ }
46+ }
47+
48+ static class SingleState implements Releasable {
4149 private final BigArrays bigArrays ;
4250 private int count ;
4351 private LongArray timestamps ;
@@ -59,53 +67,63 @@ void add(long timestamp, double value) {
5967 }
6068
6169 void add (LongBlock timestamps , DoubleBlock values ) {
62- int start = values .getFirstValueIndex (0 );
63- int end = start + values .getValueCount (0 );
64- for (int i = start ; i < end ; i ++) {
65- add (timestamps .getLong (i ), values .getDouble (i ));
70+ add (timestamps , values , 0 );
71+ }
72+
73+ void add (LongBlock timestamps , DoubleBlock values , int otherPosition ) {
74+ final int valueCount = timestamps .getValueCount (otherPosition );
75+ final int firstIndex = timestamps .getFirstValueIndex (otherPosition );
76+ for (int i = 0 ; i < valueCount ; i ++) {
77+ add (timestamps .getLong (firstIndex + i ), values .getDouble (firstIndex + i ));
6678 }
6779 }
6880
6981 void toIntermediate (Block [] blocks , int offset , DriverContext driverContext ) {
70- blocks [offset ] = toTimestampsBlock ( timestamps , driverContext .blockFactory ());
71- blocks [offset + 1 ] = toValuesBlock ( values , driverContext .blockFactory ());
82+ blocks [offset ] = buildTimestampsBlock ( driverContext .blockFactory ());
83+ blocks [offset + 1 ] = buildValuesBlock ( driverContext .blockFactory ());
7284 }
7385
74- Block toTimestampsBlock (LongArray arr , BlockFactory blockFactory ) {
75- if (arr .size () == 0 ) {
86+ // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
87+ Block toBlock (BlockFactory blockFactory ) {
88+ return blockFactory .newConstantBytesRefBlockWith (getChangePoint (), 1 );
89+ }
90+
91+ private Block buildTimestampsBlock (BlockFactory blockFactory ) {
92+ if (timestamps .size () == 0 ) {
7693 return blockFactory .newConstantNullBlock (1 );
7794 }
7895 if (values .size () == 1 ) {
79- return blockFactory .newConstantLongBlockWith (arr .get (0 ), 1 );
96+ return blockFactory .newConstantLongBlockWith (timestamps .get (0 ), 1 );
8097 }
81- try (LongBlock .Builder builder = blockFactory .newLongBlockBuilder ((int ) arr .size ())) {
98+ try (LongBlock .Builder builder = blockFactory .newLongBlockBuilder ((int ) timestamps .size ())) {
8299 builder .beginPositionEntry ();
83100 for (int id = 0 ; id < count ; id ++) {
84- builder .appendLong (arr .get (id ));
101+ builder .appendLong (timestamps .get (id ));
85102 }
86103 builder .endPositionEntry ();
87104 return builder .build ();
88105 }
89106 }
90107
91- Block toValuesBlock ( DoubleArray arr , BlockFactory blockFactory ) {
92- if (arr .size () == 0 ) {
108+ private Block buildValuesBlock ( BlockFactory blockFactory ) {
109+ if (values .size () == 0 ) {
93110 return blockFactory .newConstantNullBlock (1 );
94111 }
95112 if (values .size () == 1 ) {
96- return blockFactory .newConstantDoubleBlockWith (arr .get (0 ), 1 );
113+ return blockFactory .newConstantDoubleBlockWith (values .get (0 ), 1 );
97114 }
98- try (DoubleBlock .Builder builder = blockFactory .newDoubleBlockBuilder ((int ) arr .size ())) {
115+ try (DoubleBlock .Builder builder = blockFactory .newDoubleBlockBuilder ((int ) values .size ())) {
99116 builder .beginPositionEntry ();
100117 for (int id = 0 ; id < count ; id ++) {
101- builder .appendDouble (arr .get (id ));
118+ builder .appendDouble (values .get (id ));
102119 }
103120 builder .endPositionEntry ();
104121 return builder .build ();
105122 }
106123 }
107124
108- BytesRef toBytesRef () {
125+ // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
126+ private BytesRef getChangePoint () {
109127 // TODO: this copying doesn't account for memory
110128 List <TimeAndValue > list = new ArrayList <>(count );
111129 for (int i = 0 ; i < count ; i ++) {
@@ -129,34 +147,13 @@ BytesRef toBytesRef() {
129147 }
130148 }
131149
132- void add (LongBlock timestamps , DoubleBlock values , int otherPosition ) {
133- final int valueCount = timestamps .getValueCount (otherPosition );
134- final int firstIndex = timestamps .getFirstValueIndex (otherPosition );
135- for (int i = 0 ; i < valueCount ; i ++) {
136- add (timestamps .getLong (firstIndex + i ), values .getDouble (firstIndex + i ));
137- }
138- }
139-
140- Block toBlock (BlockFactory blockFactory ) {
141- // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
142- return blockFactory .newConstantBytesRefBlockWith (toBytesRef (), 1 );
143- }
144-
145- record TimeAndValue (long timestamp , double value ) implements Comparable <TimeAndValue > {
146- @ Override
147- public int compareTo (TimeAndValue other ) {
148- return Long .compare (timestamp , other .timestamp );
149- }
150- }
151-
152150 @ Override
153151 public void close () {
154- timestamps .close ();
155- values .close ();
152+ Releasables .close (timestamps , values );
156153 }
157154 }
158155
159- public static class GroupingState implements Releasable {
156+ static class GroupingState implements Releasable {
160157 private final BigArrays bigArrays ;
161158 private final Map <Integer , SingleState > states ;
162159
@@ -190,21 +187,21 @@ void combineState(int groupId, GroupingState otherState, int otherGroupId) {
190187 }
191188
192189 void toIntermediate (Block [] blocks , int offset , IntVector selected , DriverContext driverContext ) {
193- blocks [offset ] = toTimestampsBlock (driverContext .blockFactory (), selected );
194- blocks [offset + 1 ] = toValuesBlock (driverContext .blockFactory (), selected );
190+ blocks [offset ] = buildTimestampsBlock (driverContext .blockFactory (), selected );
191+ blocks [offset + 1 ] = buildValuesBlock (driverContext .blockFactory (), selected );
195192 }
196193
194+ // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
197195 Block evaluateFinal (IntVector selected , BlockFactory blockFactory ) {
198- // TODO: this needs to output multiple columns or a composite object, not a JSON blob.
199196 try (BytesRefBlock .Builder builder = blockFactory .newBytesRefBlockBuilder (selected .getPositionCount ())) {
200197 for (int s = 0 ; s < selected .getPositionCount (); s ++) {
201- builder .appendBytesRef (states .get (selected .getInt (s )).toBytesRef ());
198+ builder .appendBytesRef (states .get (selected .getInt (s )).getChangePoint ());
202199 }
203200 return builder .build ();
204201 }
205202 }
206203
207- Block toTimestampsBlock (BlockFactory blockFactory , IntVector selected ) {
204+ private Block buildTimestampsBlock (BlockFactory blockFactory , IntVector selected ) {
208205 if (states .isEmpty ()) {
209206 return blockFactory .newConstantNullBlock (selected .getPositionCount ());
210207 }
@@ -237,7 +234,7 @@ Block toTimestampsBlock(BlockFactory blockFactory, IntVector selected) {
237234 }
238235 }
239236
240- Block toValuesBlock (BlockFactory blockFactory , IntVector selected ) {
237+ private Block buildValuesBlock (BlockFactory blockFactory , IntVector selected ) {
241238 if (states .isEmpty ()) {
242239 return blockFactory .newConstantNullBlock (selected .getPositionCount ());
243240 }
@@ -274,9 +271,7 @@ void enableGroupIdTracking(SeenGroupIds seenGroupIds) {}
274271
275272 @ Override
276273 public void close () {
277- for (SingleState state : states .values ()) {
278- state .close ();
279- }
274+ Releasables .close (states .values ());
280275 }
281276 }
282277}
0 commit comments