1919import org .elasticsearch .compute .data .BlockFactory ;
2020import org .elasticsearch .compute .data .BytesRefBlock ;
2121import org .elasticsearch .compute .data .BytesRefVector ;
22+ import org .elasticsearch .compute .data .ElementType ;
2223import org .elasticsearch .compute .data .IntBlock ;
2324import org .elasticsearch .compute .data .IntVector ;
2425import org .elasticsearch .compute .data .LongBlock ;
2526import org .elasticsearch .compute .data .LongVector ;
2627import org .elasticsearch .compute .data .OrdinalBytesRefBlock ;
2728import org .elasticsearch .compute .data .OrdinalBytesRefVector ;
2829import org .elasticsearch .compute .data .Page ;
30+ import org .elasticsearch .core .Assertions ;
2931import org .elasticsearch .core .Releasable ;
3032import org .elasticsearch .core .ReleasableIterator ;
3133import org .elasticsearch .core .Releasables ;
3234
33- import java .util .Objects ;
35+ import java .util .List ;
3436
3537/**
3638 * An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
@@ -41,7 +43,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
4143 private final int tsHashChannel ;
4244 private final int timestampIntervalChannel ;
4345
44- private final BytesRef lastTsid = new BytesRef () ;
46+ private int lastTsidPosition = 0 ;
4547 private final BytesRefArrayWithSize tsidArray ;
4648
4749 private long lastTimestamp ;
@@ -50,56 +52,120 @@ public final class TimeSeriesBlockHash extends BlockHash {
5052 private int currentTimestampCount ;
5153 private final IntArrayWithSize perTsidCountArray ;
5254
55+ private final BlockHash assertingHash ;
56+
5357 public TimeSeriesBlockHash (int tsHashChannel , int timestampIntervalChannel , BlockFactory blockFactory ) {
5458 super (blockFactory );
5559 this .tsHashChannel = tsHashChannel ;
5660 this .timestampIntervalChannel = timestampIntervalChannel ;
5761 this .tsidArray = new BytesRefArrayWithSize (blockFactory );
5862 this .timestampArray = new LongArrayWithSize (blockFactory );
5963 this .perTsidCountArray = new IntArrayWithSize (blockFactory );
64+ if (Assertions .ENABLED ) {
65+ final var groupSpecs = List .of (
66+ new GroupSpec (tsHashChannel , ElementType .BYTES_REF ),
67+ new GroupSpec (timestampIntervalChannel , ElementType .BYTES_REF )
68+ );
69+ assertingHash = new PackedValuesBlockHash (groupSpecs , blockFactory , Integer .MAX_VALUE );
70+ } else {
71+ assertingHash = null ;
72+ }
6073 }
6174
6275 @ Override
6376 public void close () {
64- Releasables .close (tsidArray , timestampArray , perTsidCountArray );
77+ Releasables .close (tsidArray , timestampArray , perTsidCountArray , assertingHash );
78+ }
79+
80+ private OrdinalBytesRefVector getTsidVector (Page page ) {
81+ BytesRefBlock block = page .getBlock (tsHashChannel );
82+ var ordinalBlock = block .asOrdinals ();
83+ if (ordinalBlock == null ) {
84+ throw new IllegalStateException ("expected ordinal block for tsid" );
85+ }
86+ var ordinalVector = ordinalBlock .asVector ();
87+ if (ordinalVector == null ) {
88+ throw new IllegalStateException ("expected ordinal vector for tsid" );
89+ }
90+ return ordinalVector ;
91+ }
92+
93+ private LongVector getTimestampVector (Page page ) {
94+ final LongBlock timestampsBlock = page .getBlock (timestampIntervalChannel );
95+ LongVector timestampsVector = timestampsBlock .asVector ();
96+ if (timestampsVector == null ) {
97+ throw new IllegalStateException ("expected long vector for timestamp" );
98+ }
99+ return timestampsVector ;
65100 }
66101
67102 @ Override
68103 public void add (Page page , GroupingAggregatorFunction .AddInput addInput ) {
69- final BytesRefBlock tsidBlock = page .getBlock (tsHashChannel );
70- final BytesRefVector tsidVector = Objects .requireNonNull (tsidBlock .asVector (), "tsid input must be a vector" );
71- final LongBlock timestampBlock = page .getBlock (timestampIntervalChannel );
72- final LongVector timestampVector = Objects .requireNonNull (timestampBlock .asVector (), "timestamp input must be a vector" );
73- try (var ordsBuilder = blockFactory .newIntVectorBuilder (tsidVector .getPositionCount ())) {
104+ final BytesRefVector tsidDict ;
105+ final IntVector tsidOrdinals ;
106+ {
107+ final var tsidVector = getTsidVector (page );
108+ tsidDict = tsidVector .getDictionaryVector ();
109+ tsidOrdinals = tsidVector .getOrdinalsVector ();
110+ }
111+ try (var ordsBuilder = blockFactory .newIntVectorBuilder (tsidOrdinals .getPositionCount ())) {
74112 final BytesRef spare = new BytesRef ();
75- // TODO: optimize incoming ordinal block
76- for (int i = 0 ; i < tsidVector .getPositionCount (); i ++) {
77- final BytesRef tsid = tsidVector .getBytesRef (i , spare );
113+ final LongVector timestampVector = getTimestampVector (page );
114+ int lastOrd = -1 ;
115+ for (int i = 0 ; i < tsidOrdinals .getPositionCount (); i ++) {
116+ final int newOrd = tsidOrdinals .getInt (i );
117+ boolean newGroup = false ;
118+ if (lastOrd != newOrd ) {
119+ final var newTsid = tsidDict .getBytesRef (newOrd , spare );
120+ if (positionCount () == 0 ) {
121+ newGroup = true ;
122+ } else if (lastOrd == -1 ) {
123+ newGroup = lastTsid ().equals (newTsid ) == false ;
124+ } else {
125+ newGroup = true ;
126+ }
127+ if (newGroup ) {
128+ endTsidGroup ();
129+ lastTsidPosition = tsidArray .count ;
130+ tsidArray .append (newTsid );
131+ }
132+ lastOrd = newOrd ;
133+ }
78134 final long timestamp = timestampVector .getLong (i );
79- ordsBuilder .appendInt (addOnePosition (tsid , timestamp ));
135+ if (newGroup || timestamp != lastTimestamp ) {
136+ assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp ;
137+ timestampArray .append (timestamp );
138+ lastTimestamp = timestamp ;
139+ currentTimestampCount ++;
140+ }
141+ ordsBuilder .appendInt (timestampArray .count - 1 );
80142 }
81143 try (var ords = ordsBuilder .build ()) {
82144 addInput .add (0 , ords );
145+ assert assertingAddInputPage (page , ords );
83146 }
84147 }
85148 }
86149
87- private int addOnePosition (BytesRef tsid , long timestamp ) {
88- boolean newGroup = false ;
89- if (positionCount () == 0 || lastTsid .equals (tsid ) == false ) {
90- assert positionCount () == 0 || lastTsid .compareTo (tsid ) < 0 : "tsid goes backward " ;
91- endTsidGroup ();
92- tsidArray .append (tsid );
93- tsidArray .get (tsidArray .count - 1 , lastTsid );
94- newGroup = true ;
95- }
96- if (newGroup || timestamp != lastTimestamp ) {
97- assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp ;
98- timestampArray .append (timestamp );
99- lastTimestamp = timestamp ;
100- currentTimestampCount ++;
101- }
102- return positionCount () - 1 ;
150+ private boolean assertingAddInputPage (Page page , IntVector actualIds ) {
151+ assert assertingHash != null ;
152+ assertingHash .add (page , new GroupingAggregatorFunction .AddInput () {
153+ @ Override
154+ public void add (int positionOffset , IntBlock groupIds ) {
155+ assert false : "add(IntBlock) shouldn't be called" ;
156+ }
157+
158+ @ Override
159+ public void add (int positionOffset , IntVector expectedIds ) {
160+ assert expectedIds .equals (actualIds ) : "actual=" + actualIds + " vs expected = " + expectedIds ;
161+ }
162+
163+ @ Override
164+ public void close () {
165+
166+ }
167+ });
168+ return true ;
103169 }
104170
105171 private void endTsidGroup () {
@@ -109,6 +175,12 @@ private void endTsidGroup() {
109175 }
110176 }
111177
178+ private BytesRef lastTsid () {
179+ final BytesRef bytesRef = new BytesRef ();
180+ tsidArray .get (lastTsidPosition , bytesRef );
181+ return bytesRef ;
182+ }
183+
112184 @ Override
113185 public ReleasableIterator <IntBlock > lookup (Page page , ByteSizeValue targetBlockSize ) {
114186 throw new UnsupportedOperationException ("TODO" );
@@ -125,6 +197,7 @@ public Block[] getKeys() {
125197 blocks [0 ] = buildTsidBlock ();
126198 }
127199 blocks [1 ] = timestampArray .toBlock ();
200+ assert assertingKeys (blocks );
128201 return blocks ;
129202 } finally {
130203 if (blocks [blocks .length - 1 ] == null ) {
@@ -133,6 +206,18 @@ public Block[] getKeys() {
133206 }
134207 }
135208
209+ private boolean assertingKeys (Block [] actualKeys ) {
210+ assert assertingHash != null ;
211+ Block [] expectedKeys = assertingHash .getKeys ();
212+ try {
213+ assert expectedKeys [0 ].equals (actualKeys [0 ]) : "actual=" + actualKeys [0 ] + " vs expected = " + expectedKeys [0 ];
214+ assert expectedKeys [1 ].equals (actualKeys [1 ]) : "actual=" + actualKeys [1 ] + " vs expected = " + expectedKeys [1 ];
215+ } finally {
216+ Releasables .close (expectedKeys );
217+ }
218+ return true ;
219+ }
220+
136221 private BytesRefBlock buildTsidBlockWithOrdinal () {
137222 try (IntVector .FixedBuilder ordinalBuilder = blockFactory .newIntVectorFixedBuilder (positionCount ())) {
138223 for (int i = 0 ; i < tsidArray .count ; i ++) {
0 commit comments