88package org .elasticsearch .compute .aggregation .blockhash ;
99
1010import org .apache .lucene .util .BytesRef ;
11+ import org .apache .lucene .util .RamUsageEstimator ;
1112import org .elasticsearch .common .unit .ByteSizeValue ;
1213import org .elasticsearch .common .util .BigArrays ;
1314import org .elasticsearch .common .util .BitArray ;
14- import org .elasticsearch .common .util .BytesRefHash ;
15- import org .elasticsearch .common .util .LongLongHash ;
15+ import org .elasticsearch .common .util .BytesRefArray ;
16+ import org .elasticsearch .common .util .IntArray ;
17+ import org .elasticsearch .common .util .LongArray ;
1618import org .elasticsearch .compute .aggregation .GroupingAggregatorFunction ;
17- import org .elasticsearch .compute .aggregation .SeenGroupIds ;
1819import org .elasticsearch .compute .data .Block ;
20+ import org .elasticsearch .compute .data .BlockFactory ;
1921import org .elasticsearch .compute .data .BytesRefBlock ;
2022import org .elasticsearch .compute .data .BytesRefVector ;
2123import org .elasticsearch .compute .data .IntBlock ;
2224import org .elasticsearch .compute .data .IntVector ;
25+ import org .elasticsearch .compute .data .LongBigArrayVector ;
2326import org .elasticsearch .compute .data .LongBlock ;
2427import org .elasticsearch .compute .data .LongVector ;
28+ import org .elasticsearch .compute .data .OrdinalBytesRefBlock ;
29+ import org .elasticsearch .compute .data .OrdinalBytesRefVector ;
2530import org .elasticsearch .compute .data .Page ;
26- import org .elasticsearch .compute . operator . DriverContext ;
31+ import org .elasticsearch .core . Releasable ;
2732import org .elasticsearch .core .ReleasableIterator ;
2833import org .elasticsearch .core .Releasables ;
2934
3035import java .util .Objects ;
3136
37+ /**
38+ * An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
39+ * Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
40+ */
3241public final class TimeSeriesBlockHash extends BlockHash {
3342
3443 private final int tsHashChannel ;
3544 private final int timestampIntervalChannel ;
36- private final BytesRefHash tsidHashes ;
37- private final LongLongHash intervalHash ;
3845
39- long groupOrdinal = -1 ;
40- BytesRef previousTsidHash ;
41- long previousTimestampInterval ;
46+ private final BytesRef lastTsid = new BytesRef ();
47+ private final BytesRefArrayWithSize tsidArray ;
48+
49+ private long lastTimestamp ;
50+ private final LongArrayWithSize timestampArray ;
51+
52+ private int currentTimestampCount ;
53+ private final IntArrayWithSize perTsidCountArray ;
54+
55+ int groupOrdinal = -1 ;
4256
43- public TimeSeriesBlockHash (int tsHashChannel , int timestampIntervalChannel , DriverContext driverContext ) {
44- super (driverContext . blockFactory () );
57+ public TimeSeriesBlockHash (int tsHashChannel , int timestampIntervalChannel , BlockFactory blockFactory ) {
58+ super (blockFactory );
4559 this .tsHashChannel = tsHashChannel ;
4660 this .timestampIntervalChannel = timestampIntervalChannel ;
47- this .tsidHashes = new BytesRefHash (1 , blockFactory .bigArrays ());
48- this .intervalHash = new LongLongHash (1 , blockFactory .bigArrays ());
61+ this .tsidArray = new BytesRefArrayWithSize ();
62+ this .timestampArray = new LongArrayWithSize ();
63+ this .perTsidCountArray = new IntArrayWithSize ();
4964 }
5065
5166 @ Override
5267 public void close () {
53- Releasables .close (tsidHashes , intervalHash );
68+ Releasables .close (tsidArray , timestampArray , perTsidCountArray );
5469 }
5570
5671 @ Override
5772 public void add (Page page , GroupingAggregatorFunction .AddInput addInput ) {
58- BytesRefBlock tsHashBlock = page .getBlock (tsHashChannel );
59- BytesRefVector tsHashVector = Objects .requireNonNull (tsHashBlock .asVector ());
60- try (var ordsBuilder = blockFactory .newIntVectorBuilder (tsHashVector .getPositionCount ())) {
61- LongBlock timestampIntervalBlock = page .getBlock (timestampIntervalChannel );
62- BytesRef spare = new BytesRef ();
63- for (int i = 0 ; i < tsHashVector .getPositionCount (); i ++) {
64- BytesRef tsHash = tsHashVector .getBytesRef (i , spare );
65- long timestampInterval = timestampIntervalBlock .getLong (i );
66- // Optimization that relies on the fact that blocks are sorted by tsid hash and timestamp
67- if (tsHash .equals (previousTsidHash ) == false || timestampInterval != previousTimestampInterval ) {
68- long tsidOrdinal = tsidHashes .add (tsHash );
69- if (tsidOrdinal < 0 ) {
70- tsidOrdinal = -1 - tsidOrdinal ;
71- }
72- groupOrdinal = intervalHash .add (tsidOrdinal , timestampInterval );
73- if (groupOrdinal < 0 ) {
74- groupOrdinal = -1 - groupOrdinal ;
75- }
76- previousTsidHash = BytesRef .deepCopyOf (tsHash );
77- previousTimestampInterval = timestampInterval ;
78- }
79- ordsBuilder .appendInt (Math .toIntExact (groupOrdinal ));
73+ final BytesRefBlock tsidBlock = page .getBlock (tsHashChannel );
74+ final BytesRefVector tsidVector = Objects .requireNonNull (tsidBlock .asVector (), "tsid input must be a vector" );
75+ final LongBlock timestampBlock = page .getBlock (timestampIntervalChannel );
76+ final LongVector timestampVector = Objects .requireNonNull (timestampBlock .asVector (), "timestamp input must be a vector" );
77+ try (var ordsBuilder = blockFactory .newIntVectorBuilder (tsidVector .getPositionCount ())) {
78+ final BytesRef spare = new BytesRef ();
79+ // TODO: optimize incoming ordinal block
80+ for (int i = 0 ; i < tsidVector .getPositionCount (); i ++) {
81+ final BytesRef tsid = tsidVector .getBytesRef (i , spare );
82+ final long timestamp = timestampVector .getLong (i );
83+ ordsBuilder .appendInt (addOnePosition (tsid , timestamp ));
8084 }
8185 try (var ords = ordsBuilder .build ()) {
8286 addInput .add (0 , ords );
8387 }
8488 }
8589 }
8690
91+ private int addOnePosition (BytesRef tsid , long timestamp ) {
92+ boolean newGroup = false ;
93+ if (groupOrdinal == -1 || lastTsid .equals (tsid ) == false ) {
94+ assert groupOrdinal == -1 || lastTsid .compareTo (tsid ) < 0 : "tsid goes backward " ;
95+ endTsidGroup ();
96+ tsidArray .append (tsid );
97+ tsidArray .get (tsidArray .count - 1 , lastTsid );
98+ newGroup = true ;
99+ }
100+ if (newGroup || timestamp != lastTimestamp ) {
101+ assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp ;
102+ timestampArray .append (timestamp );
103+ lastTimestamp = timestamp ;
104+ groupOrdinal ++;
105+ currentTimestampCount ++;
106+ }
107+ return groupOrdinal ;
108+ }
109+
110+ private void endTsidGroup () {
111+ if (currentTimestampCount > 0 ) {
112+ perTsidCountArray .append (currentTimestampCount );
113+ currentTimestampCount = 0 ;
114+ }
115+ }
116+
87117 @ Override
88118 public ReleasableIterator <IntBlock > lookup (Page page , ByteSizeValue targetBlockSize ) {
89119 throw new UnsupportedOperationException ("TODO" );
90120 }
91121
92122 @ Override
93123 public Block [] getKeys () {
94- int positions = (int ) intervalHash .size ();
95- BytesRefVector tsidHashes = null ;
96- LongVector timestampIntervals = null ;
97- try (
98- BytesRefVector .Builder tsidHashesBuilder = blockFactory .newBytesRefVectorBuilder (positions );
99- LongVector .FixedBuilder timestampIntervalsBuilder = blockFactory .newLongVectorFixedBuilder (positions )
100- ) {
101- BytesRef scratch = new BytesRef ();
102- for (long i = 0 ; i < positions ; i ++) {
103- BytesRef key1 = this .tsidHashes .get (intervalHash .getKey1 (i ), scratch );
104- tsidHashesBuilder .appendBytesRef (key1 );
105- timestampIntervalsBuilder .appendLong ((int ) i , intervalHash .getKey2 (i ));
124+ endTsidGroup ();
125+ final Block [] blocks = new Block [2 ];
126+ try {
127+ if (OrdinalBytesRefBlock .isDense (positionCount (), tsidArray .count )) {
128+ blocks [0 ] = buildTsidBlockWithOrdinal ();
129+ } else {
130+ blocks [0 ] = buildTsidBlock ();
106131 }
107- tsidHashes = tsidHashesBuilder . build ();
108- timestampIntervals = timestampIntervalsBuilder . build () ;
132+ blocks [ 1 ] = timestampArray . toBlock ();
133+ return blocks ;
109134 } finally {
110- if (timestampIntervals == null ) {
111- Releasables .closeExpectNoException (tsidHashes );
135+ if (blocks [blocks .length - 1 ] == null ) {
136+ Releasables .close (blocks );
137+ }
138+ }
139+ }
140+
141+ private BytesRefBlock buildTsidBlockWithOrdinal () {
142+ try (IntVector .FixedBuilder ordinalBuilder = blockFactory .newIntVectorFixedBuilder (positionCount ())) {
143+ for (int i = 0 ; i < tsidArray .count ; i ++) {
144+ int numTimestamps = perTsidCountArray .array .get (i );
145+ for (int t = 0 ; t < numTimestamps ; t ++) {
146+ ordinalBuilder .appendInt (i );
147+ }
148+ }
149+ final IntVector ordinalVector = ordinalBuilder .build ();
150+ BytesRefVector dictionary = null ;
151+ boolean success = false ;
152+ try {
153+ dictionary = tsidArray .toVector ();
154+ var result = new OrdinalBytesRefVector (ordinalVector , dictionary ).asBlock ();
155+ success = true ;
156+ return result ;
157+ } finally {
158+ if (success == false ) {
159+ Releasables .close (ordinalVector , dictionary );
160+ }
161+ }
162+ }
163+ }
164+
165+ private BytesRefBlock buildTsidBlock () {
166+ try (BytesRefVector .Builder tsidBuilder = blockFactory .newBytesRefVectorBuilder (positionCount ());) {
167+ final BytesRef tsid = new BytesRef ();
168+ for (int i = 0 ; i < tsidArray .count ; i ++) {
169+ tsidArray .array .get (i , tsid );
170+ int numTimestamps = perTsidCountArray .array .get (i );
171+ for (int t = 0 ; t < numTimestamps ; t ++) {
172+ tsidBuilder .appendBytesRef (tsid );
173+ }
112174 }
175+ return tsidBuilder .build ().asBlock ();
113176 }
114- return new Block [] { tsidHashes .asBlock (), timestampIntervals .asBlock () };
177+ }
178+
179+ private int positionCount () {
180+ return groupOrdinal + 1 ;
115181 }
116182
117183 @ Override
118184 public IntVector nonEmpty () {
119- long endExclusive = intervalHash . size ();
185+ long endExclusive = positionCount ();
120186 return IntVector .range (0 , Math .toIntExact (endExclusive ), blockFactory );
121187 }
122188
123189 @ Override
124190 public BitArray seenGroupIds (BigArrays bigArrays ) {
125- long size = intervalHash .size ();
126- return new SeenGroupIds .Range (0 , Math .toIntExact (size )).seenGroupIds (bigArrays );
191+ return new Range (0 , positionCount ()).seenGroupIds (bigArrays );
127192 }
128193
129194 public String toString () {
@@ -135,4 +200,80 @@ public String toString() {
135200 + groupOrdinal
136201 + "b}" ;
137202 }
203+
204+ private class LongArrayWithSize implements Releasable {
205+ private LongArray array ;
206+ private int count = 0 ;
207+
208+ LongArrayWithSize () {
209+ this .array = blockFactory .bigArrays ().newLongArray (1 , false );
210+ }
211+
212+ void append (long value ) {
213+ this .array = blockFactory .bigArrays ().grow (array , count + 1 );
214+ this .array .set (count , value );
215+ count ++;
216+ }
217+
218+ LongBlock toBlock () {
219+ LongBlock block = new LongBigArrayVector (array , count , blockFactory ).asBlock ();
220+ blockFactory .adjustBreaker (block .ramBytesUsed () - RamUsageEstimator .sizeOf (array ));
221+ array = null ;
222+ return block ;
223+ }
224+
225+ @ Override
226+ public void close () {
227+ Releasables .close (array );
228+ }
229+ }
230+
231+ private class IntArrayWithSize implements Releasable {
232+ private IntArray array ;
233+ private int count = 0 ;
234+
235+ IntArrayWithSize () {
236+ this .array = blockFactory .bigArrays ().newIntArray (1 , false );
237+ }
238+
239+ void append (int value ) {
240+ this .array = blockFactory .bigArrays ().grow (array , count + 1 );
241+ this .array .set (count , value );
242+ count ++;
243+ }
244+
245+ @ Override
246+ public void close () {
247+ Releasables .close (array );
248+ }
249+ }
250+
251+ private class BytesRefArrayWithSize implements Releasable {
252+ private final BytesRefArray array ;
253+ private int count = 0 ;
254+
255+ BytesRefArrayWithSize () {
256+ this .array = new BytesRefArray (1 , blockFactory .bigArrays ());
257+ }
258+
259+ void append (BytesRef value ) {
260+ array .append (value );
261+ count ++;
262+ }
263+
264+ void get (int index , BytesRef dest ) {
265+ array .get (index , dest );
266+ }
267+
268+ BytesRefVector toVector () {
269+ BytesRefVector vector = blockFactory .newBytesRefArrayVector (tsidArray .array , tsidArray .count );
270+ blockFactory .adjustBreaker (vector .ramBytesUsed () - tsidArray .array .bigArraysRamBytesUsed ());
271+ return vector ;
272+ }
273+
274+ @ Override
275+ public void close () {
276+ Releasables .close (array );
277+ }
278+ }
138279}
0 commit comments