Skip to content

Commit b682250

Browse files
committed
Merge branch 'topn_partition' of https://github.com/przemekwitek/elasticsearch into topn_partition
2 parents 43611bc + 8ac8814 commit b682250

File tree

1 file changed

+68
-66
lines changed
  • x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn

1 file changed

+68
-66
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 68 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -460,78 +460,78 @@ private Iterator<Page> toPages() {
460460
// TODO: optimize case where all the queues are empty
461461
try {
462462
for (var entry : inputQueues.entrySet()) {
463-
Queue inputQueue = entry.getValue();
463+
Queue inputQueue = entry.getValue();
464464

465-
list = new ArrayList<>(inputQueue.size());
466-
builders = null;
467-
while (inputQueue.size() > 0) {
468-
list.add(inputQueue.pop());
469-
}
470-
Collections.reverse(list);
471-
472-
int p = 0;
473-
int size = 0;
474-
for (int i = 0; i < list.size(); i++) {
475-
if (builders == null) {
476-
size = Math.min(maxPageSize, list.size() - i);
477-
builders = new ResultBuilder[elementTypes.size()];
478-
for (int b = 0; b < builders.length; b++) {
479-
builders[b] = ResultBuilder.resultBuilderFor(
480-
blockFactory,
481-
elementTypes.get(b),
482-
encoders.get(b).toUnsortable(),
483-
channelInKey(sortOrders, b),
484-
size
485-
);
486-
}
487-
p = 0;
465+
list = new ArrayList<>(inputQueue.size());
466+
builders = null;
467+
while (inputQueue.size() > 0) {
468+
list.add(inputQueue.pop());
488469
}
470+
Collections.reverse(list);
471+
472+
int p = 0;
473+
int size = 0;
474+
for (int i = 0; i < list.size(); i++) {
475+
if (builders == null) {
476+
size = Math.min(maxPageSize, list.size() - i);
477+
builders = new ResultBuilder[elementTypes.size()];
478+
for (int b = 0; b < builders.length; b++) {
479+
builders[b] = ResultBuilder.resultBuilderFor(
480+
blockFactory,
481+
elementTypes.get(b),
482+
encoders.get(b).toUnsortable(),
483+
channelInKey(sortOrders, b),
484+
size
485+
);
486+
}
487+
p = 0;
488+
}
489489

490-
Row row = list.get(i);
491-
BytesRef keys = row.keys.bytesRefView();
492-
for (SortOrder so : sortOrders) {
493-
if (keys.bytes[keys.offset] == so.nul()) {
490+
Row row = list.get(i);
491+
BytesRef keys = row.keys.bytesRefView();
492+
for (SortOrder so : sortOrders) {
493+
if (keys.bytes[keys.offset] == so.nul()) {
494+
keys.offset++;
495+
keys.length--;
496+
continue;
497+
}
494498
keys.offset++;
495499
keys.length--;
496-
continue;
500+
builders[so.channel].decodeKey(keys);
501+
}
502+
if (keys.length != 0) {
503+
throw new IllegalArgumentException("didn't read all keys");
497504
}
498-
keys.offset++;
499-
keys.length--;
500-
builders[so.channel].decodeKey(keys);
501-
}
502-
if (keys.length != 0) {
503-
throw new IllegalArgumentException("didn't read all keys");
504-
}
505-
506-
BytesRef values = row.values.bytesRefView();
507-
for (ResultBuilder builder : builders) {
508-
builder.decodeValue(values);
509-
}
510-
if (values.length != 0) {
511-
throw new IllegalArgumentException("didn't read all values");
512-
}
513505

514-
list.set(i, null);
515-
row.close();
506+
BytesRef values = row.values.bytesRefView();
507+
for (ResultBuilder builder : builders) {
508+
builder.decodeValue(values);
509+
}
510+
if (values.length != 0) {
511+
throw new IllegalArgumentException("didn't read all values");
512+
}
516513

517-
p++;
518-
if (p == size) {
519-
Block[] blocks = new Block[builders.length];
520-
try {
521-
for (int b = 0; b < blocks.length; b++) {
522-
blocks[b] = builders[b].build();
523-
}
524-
} finally {
525-
if (blocks[blocks.length - 1] == null) {
526-
Releasables.closeExpectNoException(blocks);
514+
list.set(i, null);
515+
row.close();
516+
517+
p++;
518+
if (p == size) {
519+
Block[] blocks = new Block[builders.length];
520+
try {
521+
for (int b = 0; b < blocks.length; b++) {
522+
blocks[b] = builders[b].build();
523+
}
524+
} finally {
525+
if (blocks[blocks.length - 1] == null) {
526+
Releasables.closeExpectNoException(blocks);
527+
}
527528
}
529+
result.add(new Page(blocks));
530+
Releasables.closeExpectNoException(builders);
531+
builders = null;
528532
}
529-
result.add(new Page(blocks));
530-
Releasables.closeExpectNoException(builders);
531-
builders = null;
532533
}
533-
}
534-
assert builders == null;
534+
assert builders == null;
535535
}
536536
success = true;
537537
return result.iterator();
@@ -585,9 +585,8 @@ public void close() {
585585
Releasables.closeExpectNoException(spare, Releasables.wrap(releasables));
586586
}
587587

588-
private static long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class)
589-
+ RamUsageEstimator.shallowSizeOfInstance(List.class) * 4
590-
+ RamUsageEstimator.shallowSizeOfInstance(Map.class);
588+
private static long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TopNOperator.class) + RamUsageEstimator
589+
.shallowSizeOfInstance(List.class) * 4 + RamUsageEstimator.shallowSizeOfInstance(Map.class);
591590

592591
@Override
593592
public long ramBytesUsed() {
@@ -602,7 +601,8 @@ public long ramBytesUsed() {
602601
size += partitions.size() * Partition.SHALLOW_SIZE;
603602
size += RamUsageEstimator.alignObjectSize(arrHeader + ref * sortOrders.size());
604603
size += sortOrders.size() * SortOrder.SHALLOW_SIZE;
605-
long ramBytesUsedSum = inputQueues.entrySet().stream()
604+
long ramBytesUsedSum = inputQueues.entrySet()
605+
.stream()
606606
.mapToLong(e -> e.getKey().bytes.length + e.getValue().ramBytesUsed())
607607
.sum();
608608
size += ramBytesUsedSum;
@@ -619,7 +619,9 @@ public Status status() {
619619
public String toString() {
620620
int queueSizeSum = inputQueues.values().stream().mapToInt(Queue::size).sum();
621621
return "TopNOperator[count="
622-
+ queueSizeSum + "/" + topCount
622+
+ queueSizeSum
623+
+ "/"
624+
+ topCount
623625
+ ", elementTypes="
624626
+ elementTypes
625627
+ ", encoders="

0 commit comments

Comments
 (0)