25
25
26
26
import java .util .ArrayList ;
27
27
import java .util .Arrays ;
28
- import java .util .BitSet ;
29
28
import java .util .Collections ;
30
29
import java .util .Iterator ;
31
30
import java .util .List ;
@@ -51,8 +50,7 @@ public class TopNOperator implements Operator, Accountable {
51
50
* multivalues) to reference each position in each block of the Page.
52
51
*/
53
52
static final class Row implements Accountable , Releasable {
54
- private static final long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (Row .class ) + RamUsageEstimator
55
- .shallowSizeOfInstance (BitSet .class );
53
+ private static final long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (Row .class );
56
54
57
55
/**
58
56
* The sort key.
@@ -64,7 +62,7 @@ static final class Row implements Accountable, Releasable {
64
62
* For ex, if a Long is represented as 8 bytes, each of these bytes will have the same value (set/unset) if the respective Long
65
63
* value is used for sorting ascending/descending.
66
64
*/
67
- final BitSet orderByCompositeKeyAscending = new BitSet () ;
65
+ final BytesOrder bytesOrder ;
68
66
69
67
/**
70
68
* Values to reconstruct the row. Sort of. When we reconstruct the row we read
@@ -73,11 +71,12 @@ static final class Row implements Accountable, Releasable {
73
71
*/
74
72
final BreakingBytesRefBuilder values ;
75
73
76
- Row (CircuitBreaker breaker ) {
74
+ Row (CircuitBreaker breaker , List < SortOrder > sortOrders ) {
77
75
boolean success = false ;
78
76
try {
79
77
keys = new BreakingBytesRefBuilder (breaker , "topn" );
80
78
values = new BreakingBytesRefBuilder (breaker , "topn" );
79
+ bytesOrder = new BytesOrder (sortOrders , breaker , "topn" );
81
80
success = true ;
82
81
} finally {
83
82
if (success == false ) {
@@ -88,12 +87,54 @@ static final class Row implements Accountable, Releasable {
88
87
89
88
@ Override
90
89
public long ramBytesUsed () {
91
- return SHALLOW_SIZE + keys .ramBytesUsed () + orderByCompositeKeyAscending . size () / Byte . SIZE + values .ramBytesUsed ();
90
+ return SHALLOW_SIZE + keys .ramBytesUsed () + bytesOrder . ramBytesUsed () + values .ramBytesUsed ();
92
91
}
93
92
94
93
@ Override
95
94
public void close () {
96
- Releasables .closeExpectNoException (keys , values );
95
+ Releasables .closeExpectNoException (keys , values , bytesOrder );
96
+ }
97
+ }
98
+
99
+ static final class BytesOrder implements Releasable , Accountable {
100
+ private static final long BASE_RAM_USAGE = RamUsageEstimator .shallowSizeOfInstance (BytesOrder .class );
101
+ private final CircuitBreaker breaker ;
102
+ final List <SortOrder > sortOrders ;
103
+ final int [] endOffsets ;
104
+
105
+ BytesOrder (List <SortOrder > sortOrders , CircuitBreaker breaker , String label ) {
106
+ this .breaker = breaker ;
107
+ this .sortOrders = sortOrders ;
108
+ breaker .addEstimateBytesAndMaybeBreak (memoryUsed (sortOrders .size ()), label );
109
+ this .endOffsets = new int [sortOrders .size ()];
110
+ }
111
+
112
+ /**
113
+ * Returns true if the byte at the given position is ordered ascending; otherwise, return false
114
+ */
115
+ boolean isByteOrderAscending (int bytePosition ) {
116
+ int index = Arrays .binarySearch (endOffsets , bytePosition );
117
+ if (index < 0 ) {
118
+ index = -1 - index ;
119
+ }
120
+ return sortOrders .get (index ).asc ();
121
+ }
122
+
123
+ private long memoryUsed (int numKeys ) {
124
+ // sortOrders is global and its memory is accounted at the top level TopNOperator
125
+ return BASE_RAM_USAGE + RamUsageEstimator .alignObjectSize (
126
+ (long ) RamUsageEstimator .NUM_BYTES_ARRAY_HEADER + (long ) Integer .BYTES * numKeys
127
+ );
128
+ }
129
+
130
+ @ Override
131
+ public long ramBytesUsed () {
132
+ return memoryUsed (sortOrders .size ());
133
+ }
134
+
135
+ @ Override
136
+ public void close () {
137
+ breaker .addWithoutBreaking (-ramBytesUsed ());
97
138
}
98
139
}
99
140
@@ -138,14 +179,11 @@ void row(int position, Row destination) {
138
179
139
180
private void writeKey (int position , Row row ) {
140
181
int orderByCompositeKeyCurrentPosition = 0 ;
141
- for (KeyFactory factory : keyFactories ) {
142
- int valueAsBytesSize = factory .extractor .writeKey (row .keys , position );
143
- row .orderByCompositeKeyAscending .set (
144
- orderByCompositeKeyCurrentPosition ,
145
- valueAsBytesSize + orderByCompositeKeyCurrentPosition ,
146
- factory .ascending
147
- );
182
+ for (int i = 0 ; i < keyFactories .length ; i ++) {
183
+ int valueAsBytesSize = keyFactories [i ].extractor .writeKey (row .keys , position );
184
+ assert valueAsBytesSize > 0 : valueAsBytesSize ;
148
185
orderByCompositeKeyCurrentPosition += valueAsBytesSize ;
186
+ row .bytesOrder .endOffsets [i ] = orderByCompositeKeyCurrentPosition - 1 ;
149
187
}
150
188
}
151
189
@@ -189,9 +227,7 @@ public record TopNOperatorFactory(
189
227
List <SortOrder > sortOrders ,
190
228
int maxPageSize
191
229
) implements OperatorFactory {
192
- public TopNOperatorFactory
193
-
194
- {
230
+ public TopNOperatorFactory {
195
231
for (ElementType e : elementTypes ) {
196
232
if (e == null ) {
197
233
throw new IllegalArgumentException ("ElementType not known" );
@@ -274,19 +310,20 @@ static int compareRows(Row r1, Row r2) {
274
310
// the two rows are equal
275
311
return 0 ;
276
312
}
313
+
277
314
int length = Math .min (br1 .length , br2 .length );
278
315
// one value is the prefix of the other
279
316
if (mismatchedByteIndex == length ) {
280
317
// the value with the greater length is considered greater than the other
281
318
if (length == br1 .length ) {// first row is less than the second row
282
- return r2 .orderByCompositeKeyAscending . get (length ) ? 1 : -1 ;
319
+ return r2 .bytesOrder . isByteOrderAscending (length ) ? 1 : -1 ;
283
320
} else {// second row is less than the first row
284
- return r1 .orderByCompositeKeyAscending . get (length ) ? -1 : 1 ;
321
+ return r1 .bytesOrder . isByteOrderAscending (length ) ? -1 : 1 ;
285
322
}
286
323
} else {
287
324
// compare the byte that mismatched accounting for that respective byte asc/desc ordering
288
325
int c = Byte .compareUnsigned (br1 .bytes [br1 .offset + mismatchedByteIndex ], br2 .bytes [br2 .offset + mismatchedByteIndex ]);
289
- return r1 .orderByCompositeKeyAscending . get (mismatchedByteIndex ) ? -c : c ;
326
+ return r1 .bytesOrder . isByteOrderAscending (mismatchedByteIndex ) ? -c : c ;
290
327
}
291
328
}
292
329
@@ -312,10 +349,9 @@ public void addInput(Page page) {
312
349
try {
313
350
for (int i = 0 ; i < page .getPositionCount (); i ++) {
314
351
if (spare == null ) {
315
- spare = new Row (breaker );
352
+ spare = new Row (breaker , sortOrders );
316
353
} else {
317
354
spare .keys .clear ();
318
- spare .orderByCompositeKeyAscending .clear ();
319
355
spare .values .clear ();
320
356
}
321
357
rowFiller .row (i , spare );
0 commit comments