@@ -461,78 +461,78 @@ private Iterator<Page> toPages() {
461461 // TODO: optimize case where all the queues are empty
462462 try {
463463 for (var entry : inputQueues .entrySet ()) {
464- Queue inputQueue = entry .getValue ();
464+ Queue inputQueue = entry .getValue ();
465465
466- list = new ArrayList <>(inputQueue .size ());
467- builders = null ;
468- while (inputQueue .size () > 0 ) {
469- list .add (inputQueue .pop ());
470- }
471- Collections .reverse (list );
472-
473- int p = 0 ;
474- int size = 0 ;
475- for (int i = 0 ; i < list .size (); i ++) {
476- if (builders == null ) {
477- size = Math .min (maxPageSize , list .size () - i );
478- builders = new ResultBuilder [elementTypes .size ()];
479- for (int b = 0 ; b < builders .length ; b ++) {
480- builders [b ] = ResultBuilder .resultBuilderFor (
481- blockFactory ,
482- elementTypes .get (b ),
483- encoders .get (b ).toUnsortable (),
484- channelInKey (sortOrders , b ),
485- size
486- );
487- }
488- p = 0 ;
466+ list = new ArrayList <>(inputQueue .size ());
467+ builders = null ;
468+ while (inputQueue .size () > 0 ) {
469+ list .add (inputQueue .pop ());
489470 }
471+ Collections .reverse (list );
472+
473+ int p = 0 ;
474+ int size = 0 ;
475+ for (int i = 0 ; i < list .size (); i ++) {
476+ if (builders == null ) {
477+ size = Math .min (maxPageSize , list .size () - i );
478+ builders = new ResultBuilder [elementTypes .size ()];
479+ for (int b = 0 ; b < builders .length ; b ++) {
480+ builders [b ] = ResultBuilder .resultBuilderFor (
481+ blockFactory ,
482+ elementTypes .get (b ),
483+ encoders .get (b ).toUnsortable (),
484+ channelInKey (sortOrders , b ),
485+ size
486+ );
487+ }
488+ p = 0 ;
489+ }
490490
491- Row row = list .get (i );
492- BytesRef keys = row .keys .bytesRefView ();
493- for (SortOrder so : sortOrders ) {
494- if (keys .bytes [keys .offset ] == so .nul ()) {
491+ Row row = list .get (i );
492+ BytesRef keys = row .keys .bytesRefView ();
493+ for (SortOrder so : sortOrders ) {
494+ if (keys .bytes [keys .offset ] == so .nul ()) {
495+ keys .offset ++;
496+ keys .length --;
497+ continue ;
498+ }
495499 keys .offset ++;
496500 keys .length --;
497- continue ;
501+ builders [so .channel ].decodeKey (keys );
502+ }
503+ if (keys .length != 0 ) {
504+ throw new IllegalArgumentException ("didn't read all keys" );
498505 }
499- keys .offset ++;
500- keys .length --;
501- builders [so .channel ].decodeKey (keys );
502- }
503- if (keys .length != 0 ) {
504- throw new IllegalArgumentException ("didn't read all keys" );
505- }
506-
507- BytesRef values = row .values .bytesRefView ();
508- for (ResultBuilder builder : builders ) {
509- builder .decodeValue (values );
510- }
511- if (values .length != 0 ) {
512- throw new IllegalArgumentException ("didn't read all values" );
513- }
514506
515- list .set (i , null );
516- row .close ();
507+ BytesRef values = row .values .bytesRefView ();
508+ for (ResultBuilder builder : builders ) {
509+ builder .decodeValue (values );
510+ }
511+ if (values .length != 0 ) {
512+ throw new IllegalArgumentException ("didn't read all values" );
513+ }
517514
518- p ++;
519- if (p == size ) {
520- Block [] blocks = new Block [builders .length ];
521- try {
522- for (int b = 0 ; b < blocks .length ; b ++) {
523- blocks [b ] = builders [b ].build ();
524- }
525- } finally {
526- if (blocks [blocks .length - 1 ] == null ) {
527- Releasables .closeExpectNoException (blocks );
515+ list .set (i , null );
516+ row .close ();
517+
518+ p ++;
519+ if (p == size ) {
520+ Block [] blocks = new Block [builders .length ];
521+ try {
522+ for (int b = 0 ; b < blocks .length ; b ++) {
523+ blocks [b ] = builders [b ].build ();
524+ }
525+ } finally {
526+ if (blocks [blocks .length - 1 ] == null ) {
527+ Releasables .closeExpectNoException (blocks );
528+ }
528529 }
530+ result .add (new Page (blocks ));
531+ Releasables .closeExpectNoException (builders );
532+ builders = null ;
529533 }
530- result .add (new Page (blocks ));
531- Releasables .closeExpectNoException (builders );
532- builders = null ;
533534 }
534- }
535- assert builders == null ;
535+ assert builders == null ;
536536 }
537537 success = true ;
538538 return result .iterator ();
@@ -586,9 +586,8 @@ public void close() {
586586 Releasables .closeExpectNoException (spare , Releasables .wrap (releasables ));
587587 }
588588
589- private static long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (TopNOperator .class )
590- + RamUsageEstimator .shallowSizeOfInstance (List .class ) * 4
591- + RamUsageEstimator .shallowSizeOfInstance (Map .class );
589+ private static long SHALLOW_SIZE = RamUsageEstimator .shallowSizeOfInstance (TopNOperator .class ) + RamUsageEstimator
590+ .shallowSizeOfInstance (List .class ) * 4 + RamUsageEstimator .shallowSizeOfInstance (Map .class );
592591
593592 @ Override
594593 public long ramBytesUsed () {
@@ -603,7 +602,8 @@ public long ramBytesUsed() {
603602 size += partitions .size () * Partition .SHALLOW_SIZE ;
604603 size += RamUsageEstimator .alignObjectSize (arrHeader + ref * sortOrders .size ());
605604 size += sortOrders .size () * SortOrder .SHALLOW_SIZE ;
606- long ramBytesUsedSum = inputQueues .entrySet ().stream ()
605+ long ramBytesUsedSum = inputQueues .entrySet ()
606+ .stream ()
607607 .mapToLong (e -> e .getKey ().getBytes (Charset .defaultCharset ()).length + e .getValue ().ramBytesUsed ())
608608 .sum ();
609609 size += ramBytesUsedSum ;
@@ -620,7 +620,9 @@ public Status status() {
620620 public String toString () {
621621 int queueSizeSum = inputQueues .values ().stream ().mapToInt (Queue ::size ).sum ();
622622 return "TopNOperator[count="
623- + queueSizeSum + "/" + topCount
623+ + queueSizeSum
624+ + "/"
625+ + topCount
624626 + ", elementTypes="
625627 + elementTypes
626628 + ", encoders="
0 commit comments