88package org .elasticsearch .compute .aggregation .blockhash ;
99
1010import org .apache .lucene .util .BytesRef ;
11- import org .apache .lucene .util .RamUsageEstimator ;
1211import org .elasticsearch .common .unit .ByteSizeValue ;
1312import org .elasticsearch .common .util .BigArrays ;
1413import org .elasticsearch .common .util .BitArray ;
2221import org .elasticsearch .compute .data .BytesRefVector ;
2322import org .elasticsearch .compute .data .IntBlock ;
2423import org .elasticsearch .compute .data .IntVector ;
25- import org .elasticsearch .compute .data .LongBigArrayVector ;
2624import org .elasticsearch .compute .data .LongBlock ;
2725import org .elasticsearch .compute .data .LongVector ;
2826import org .elasticsearch .compute .data .OrdinalBytesRefBlock ;
@@ -52,15 +50,13 @@ public final class TimeSeriesBlockHash extends BlockHash {
5250 private int currentTimestampCount ;
5351 private final IntArrayWithSize perTsidCountArray ;
5452
55- int groupOrdinal = -1 ;
56-
5753 public TimeSeriesBlockHash (int tsHashChannel , int timestampIntervalChannel , BlockFactory blockFactory ) {
5854 super (blockFactory );
5955 this .tsHashChannel = tsHashChannel ;
6056 this .timestampIntervalChannel = timestampIntervalChannel ;
61- this .tsidArray = new BytesRefArrayWithSize ();
62- this .timestampArray = new LongArrayWithSize ();
63- this .perTsidCountArray = new IntArrayWithSize ();
57+ this .tsidArray = new BytesRefArrayWithSize (blockFactory );
58+ this .timestampArray = new LongArrayWithSize (blockFactory );
59+ this .perTsidCountArray = new IntArrayWithSize (blockFactory );
6460 }
6561
6662 @ Override
@@ -90,8 +86,8 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
9086
9187 private int addOnePosition (BytesRef tsid , long timestamp ) {
9288 boolean newGroup = false ;
93- if (groupOrdinal == - 1 || lastTsid .equals (tsid ) == false ) {
94- assert groupOrdinal == - 1 || lastTsid .compareTo (tsid ) < 0 : "tsid goes backward " ;
89+ if (positionCount () == 0 || lastTsid .equals (tsid ) == false ) {
90+ assert positionCount () == 0 || lastTsid .compareTo (tsid ) < 0 : "tsid goes backward " ;
9591 endTsidGroup ();
9692 tsidArray .append (tsid );
9793 tsidArray .get (tsidArray .count - 1 , lastTsid );
@@ -101,10 +97,9 @@ private int addOnePosition(BytesRef tsid, long timestamp) {
10197 assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp ;
10298 timestampArray .append (timestamp );
10399 lastTimestamp = timestamp ;
104- groupOrdinal ++;
105100 currentTimestampCount ++;
106101 }
107- return groupOrdinal ;
102+ return positionCount () - 1 ;
108103 }
109104
110105 private void endTsidGroup () {
@@ -177,7 +172,7 @@ private BytesRefBlock buildTsidBlock() {
177172 }
178173
179174 private int positionCount () {
180- return groupOrdinal + 1 ;
175+ return timestampArray . count ;
181176 }
182177
183178 @ Override
@@ -197,15 +192,17 @@ public String toString() {
197192 + "], LongKey[channel="
198193 + timestampIntervalChannel
199194 + "]], entries="
200- + groupOrdinal
195+ + positionCount ()
201196 + "b}" ;
202197 }
203198
204- private class LongArrayWithSize implements Releasable {
199+ private static class LongArrayWithSize implements Releasable {
200+ private final BlockFactory blockFactory ;
205201 private LongArray array ;
206202 private int count = 0 ;
207203
208- LongArrayWithSize () {
204+ LongArrayWithSize (BlockFactory blockFactory ) {
205+ this .blockFactory = blockFactory ;
209206 this .array = blockFactory .bigArrays ().newLongArray (1 , false );
210207 }
211208
@@ -216,10 +213,12 @@ void append(long value) {
216213 }
217214
218215 LongBlock toBlock () {
219- LongBlock block = new LongBigArrayVector (array , count , blockFactory ).asBlock ();
220- blockFactory .adjustBreaker (block .ramBytesUsed () - RamUsageEstimator .sizeOf (array ));
221- array = null ;
222- return block ;
216+ try (var builder = blockFactory .newLongVectorFixedBuilder (count )) {
217+ for (int i = 0 ; i < count ; i ++) {
218+ builder .appendLong (array .get (i ));
219+ }
220+ return builder .build ().asBlock ();
221+ }
223222 }
224223
225224 @ Override
@@ -228,11 +227,13 @@ public void close() {
228227 }
229228 }
230229
231- private class IntArrayWithSize implements Releasable {
230+ private static class IntArrayWithSize implements Releasable {
231+ private final BlockFactory blockFactory ;
232232 private IntArray array ;
233233 private int count = 0 ;
234234
235- IntArrayWithSize () {
235+ IntArrayWithSize (BlockFactory blockFactory ) {
236+ this .blockFactory = blockFactory ;
236237 this .array = blockFactory .bigArrays ().newIntArray (1 , false );
237238 }
238239
@@ -248,11 +249,13 @@ public void close() {
248249 }
249250 }
250251
251- private class BytesRefArrayWithSize implements Releasable {
252- private final BytesRefArray array ;
252+ private static class BytesRefArrayWithSize implements Releasable {
253+ private final BlockFactory blockFactory ;
254+ private BytesRefArray array ;
253255 private int count = 0 ;
254256
255- BytesRefArrayWithSize () {
257+ BytesRefArrayWithSize (BlockFactory blockFactory ) {
258+ this .blockFactory = blockFactory ;
256259 this .array = new BytesRefArray (1 , blockFactory .bigArrays ());
257260 }
258261
@@ -266,8 +269,9 @@ void get(int index, BytesRef dest) {
266269 }
267270
268271 BytesRefVector toVector () {
269- BytesRefVector vector = blockFactory .newBytesRefArrayVector (tsidArray .array , tsidArray .count );
270- blockFactory .adjustBreaker (vector .ramBytesUsed () - tsidArray .array .bigArraysRamBytesUsed ());
272+ BytesRefVector vector = blockFactory .newBytesRefArrayVector (array , count );
273+ blockFactory .adjustBreaker (vector .ramBytesUsed () - array .bigArraysRamBytesUsed ());
274+ array = null ;
271275 return vector ;
272276 }
273277
0 commit comments