8181class DownsampleShardIndexer {
8282
8383 private static final Logger logger = LogManager .getLogger (DownsampleShardIndexer .class );
84- private static final int DOCID_BUFFER_SIZE = 8069 ;
84+ private static final int DOCID_BUFFER_SIZE = 8096 ;
8585 public static final int DOWNSAMPLE_BULK_ACTIONS = 10000 ;
8686 public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = ByteSizeValue .of (1 , ByteSizeUnit .MB );
8787 public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = ByteSizeValue .of (50 , ByteSizeUnit .MB );
@@ -340,6 +340,7 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
340340 private class TimeSeriesBucketCollector extends BucketCollector {
341341 private final BulkProcessor2 bulkProcessor ;
342342 private final DownsampleBucketBuilder downsampleBucketBuilder ;
343+ private final List <LeafDownsampleCollector > leafBucketCollectors = new ArrayList <>();
343344 private long docsProcessed ;
344345 private long bucketsCreated ;
345346 long lastTimestamp = Long .MAX_VALUE ;
@@ -367,106 +368,130 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
367368 formattedDocValues [i ] = fieldValueFetchers .get (i ).getLeaf (ctx );
368369 }
369370
370- long timestampBoundStartTime = searchExecutionContext . getIndexSettings (). getTimestampBounds (). startTime ( );
371- return new LeafBucketCollector () {
372-
373- final IntArrayList buffer = new IntArrayList ( DOCID_BUFFER_SIZE );
371+ var leafBucketCollector = new LeafDownsampleCollector ( aggCtx , docCountProvider , fieldProducers , formattedDocValues );
372+ leafBucketCollectors . add ( leafBucketCollector );
373+ return leafBucketCollector ;
374+ }
374375
375- @ Override
376- public void collect (int docId , long owningBucketOrd ) throws IOException {
377- task .addNumReceived (1 );
378- final BytesRef tsidHash = aggCtx .getTsidHash ();
379- assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
380- final int tsidHashOrd = aggCtx .getTsidHashOrd ();
381- final long timestamp = timestampField .resolution ().roundDownToMillis (aggCtx .getTimestamp ());
376+ void bulkCollection () throws IOException {
377+ for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors .reversed ()) {
378+ leafBucketCollector .leafBulkCollection ();
379+ }
380+ }
382381
383- boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder .tsidOrd ();
384- if (tsidChanged || timestamp < lastHistoTimestamp ) {
385- lastHistoTimestamp = Math .max (rounding .round (timestamp ), timestampBoundStartTime );
386- }
387- task .setLastSourceTimestamp (timestamp );
388- task .setLastTargetTimestamp (lastHistoTimestamp );
389-
390- if (logger .isTraceEnabled ()) {
391- logger .trace (
392- "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]" ,
393- docId ,
394- DocValueFormat .TIME_SERIES_ID .format (tsidHash ),
395- timestampFormat .format (timestamp ),
396- timestampFormat .format (lastHistoTimestamp )
397- );
398- }
382+ class LeafDownsampleCollector extends LeafBucketCollector {
383+
384+ final AggregationExecutionContext aggCtx ;
385+ final DocCountProvider docCountProvider ;
386+ final FormattedDocValues [] formattedDocValues ;
387+ final AbstractDownsampleFieldProducer [] fieldProducers ;
388+
389+ final IntArrayList buffer = new IntArrayList (DOCID_BUFFER_SIZE );
390+ final long timestampBoundStartTime = searchExecutionContext .getIndexSettings ().getTimestampBounds ().startTime ();
391+
392+ LeafDownsampleCollector (
393+ AggregationExecutionContext aggCtx ,
394+ DocCountProvider docCountProvider ,
395+ AbstractDownsampleFieldProducer [] fieldProducers ,
396+ FormattedDocValues [] formattedDocValues
397+ ) {
398+ this .aggCtx = aggCtx ;
399+ this .docCountProvider = docCountProvider ;
400+ this .fieldProducers = fieldProducers ;
401+ this .formattedDocValues = formattedDocValues ;
402+ }
399403
400- /*
401- * Sanity checks to ensure that we receive documents in the correct order
402- * - _tsid must be sorted in ascending order
403- * - @timestamp must be sorted in descending order within the same _tsid
404- */
405- BytesRef lastTsid = downsampleBucketBuilder .tsid ();
406- assert lastTsid == null || lastTsid .compareTo (tsidHash ) <= 0
407- : "_tsid is not sorted in ascending order: ["
408- + DocValueFormat .TIME_SERIES_ID .format (lastTsid )
409- + "] -> ["
410- + DocValueFormat .TIME_SERIES_ID .format (tsidHash )
411- + "]" ;
412- assert tsidHash .equals (lastTsid ) == false || lastTimestamp >= timestamp
413- : "@timestamp is not sorted in descending order: ["
414- + timestampFormat .format (lastTimestamp )
415- + "] -> ["
416- + timestampFormat .format (timestamp )
417- + "]" ;
418- lastTimestamp = timestamp ;
419-
420- if (tsidChanged || downsampleBucketBuilder .timestamp () != lastHistoTimestamp ) {
421- bulkCollection ();
422- // Flush downsample doc if not empty
423- if (downsampleBucketBuilder .isEmpty () == false ) {
424- XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
425- indexBucket (doc );
426- }
427-
428- // Create new downsample bucket
429- if (tsidChanged ) {
430- downsampleBucketBuilder .resetTsid (tsidHash , tsidHashOrd , lastHistoTimestamp );
431- } else {
432- downsampleBucketBuilder .resetTimestamp (lastHistoTimestamp );
433- }
434- bucketsCreated ++;
435- }
436- buffer .buffer [buffer .elementsCount ++] = docId ;
437- if (buffer .size () == DOCID_BUFFER_SIZE ) {
438- bulkCollection ();
439- }
404+ @ Override
405+ public void collect (int docId , long owningBucketOrd ) throws IOException {
406+ task .addNumReceived (1 );
407+ final BytesRef tsidHash = aggCtx .getTsidHash ();
408+ assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
409+ final int tsidHashOrd = aggCtx .getTsidHashOrd ();
410+ final long timestamp = timestampField .resolution ().roundDownToMillis (aggCtx .getTimestamp ());
411+
412+ boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder .tsidOrd ();
413+ if (tsidChanged || timestamp < lastHistoTimestamp ) {
414+ lastHistoTimestamp = Math .max (rounding .round (timestamp ), timestampBoundStartTime );
440415 }
441-
442- @ Override
443- public void finish () throws IOException {
444- bulkCollection ();
416+ task .setLastSourceTimestamp (timestamp );
417+ task .setLastTargetTimestamp (lastHistoTimestamp );
418+
419+ if (logger .isTraceEnabled ()) {
420+ logger .trace (
421+ "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]" ,
422+ docId ,
423+ DocValueFormat .TIME_SERIES_ID .format (tsidHash ),
424+ timestampFormat .format (timestamp ),
425+ timestampFormat .format (lastHistoTimestamp )
426+ );
445427 }
446428
447- void bulkCollection () throws IOException {
448- if (buffer .isEmpty ()) {
449- return ;
429+ /*
430+ * Sanity checks to ensure that we receive documents in the correct order
431+ * - _tsid must be sorted in ascending order
432+ * - @timestamp must be sorted in descending order within the same _tsid
433+ */
434+ BytesRef lastTsid = downsampleBucketBuilder .tsid ();
435+ assert lastTsid == null || lastTsid .compareTo (tsidHash ) <= 0
436+ : "_tsid is not sorted in ascending order: ["
437+ + DocValueFormat .TIME_SERIES_ID .format (lastTsid )
438+ + "] -> ["
439+ + DocValueFormat .TIME_SERIES_ID .format (tsidHash )
440+ + "]" ;
441+ assert tsidHash .equals (lastTsid ) == false || lastTimestamp >= timestamp
442+ : "@timestamp is not sorted in descending order: ["
443+ + timestampFormat .format (lastTimestamp )
444+ + "] -> ["
445+ + timestampFormat .format (timestamp )
446+ + "]" ;
447+ lastTimestamp = timestamp ;
448+
449+ if (tsidChanged || downsampleBucketBuilder .timestamp () != lastHistoTimestamp ) {
450+ bulkCollection ();
451+ // Flush downsample doc if not empty
452+ if (downsampleBucketBuilder .isEmpty () == false ) {
453+ XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
454+ indexBucket (doc );
450455 }
451456
452- if (logger .isDebugEnabled ()) {
453- logger .debug ("buffered {} docids" , buffer .size ());
457+ // Create new downsample bucket
458+ if (tsidChanged ) {
459+ downsampleBucketBuilder .resetTsid (tsidHash , tsidHashOrd , lastHistoTimestamp );
460+ } else {
461+ downsampleBucketBuilder .resetTimestamp (lastHistoTimestamp );
454462 }
463+ bucketsCreated ++;
464+ }
465+ buffer .add (docId );
466+ // buffer.buffer[buffer.elementsCount++] = docId;
467+ // if (buffer.size() == DOCID_BUFFER_SIZE) {
468+ // leafBulkCollection();
469+ // }
470+ }
455471
456- downsampleBucketBuilder .collectDocCount (buffer , docCountProvider );
457- // Iterate over all field values and collect the doc_values for this docId
458- for (int i = 0 ; i < fieldProducers .length ; i ++) {
459- AbstractDownsampleFieldProducer fieldProducer = fieldProducers [i ];
460- FormattedDocValues docValues = formattedDocValues [i ];
461- fieldProducer .collect (docValues , buffer );
462- }
472+ void leafBulkCollection () throws IOException {
473+ if (buffer .isEmpty ()) {
474+ return ;
475+ }
463476
464- docsProcessed += buffer .size ();
465- task .setDocsProcessed (docsProcessed );
477+ if (logger .isDebugEnabled ()) {
478+ logger .debug ("buffered {} docids" , buffer .size ());
479+ }
466480
467- buffer .elementsCount = 0 ;
481+ downsampleBucketBuilder .collectDocCount (buffer , docCountProvider );
482+ // Iterate over all field values and collect the doc_values for this docId
483+ for (int i = 0 ; i < fieldProducers .length ; i ++) {
484+ AbstractDownsampleFieldProducer fieldProducer = fieldProducers [i ];
485+ FormattedDocValues docValues = formattedDocValues [i ];
486+ fieldProducer .collect (docValues , buffer );
468487 }
469- };
488+
489+ docsProcessed += buffer .size ();
490+ task .setDocsProcessed (docsProcessed );
491+
492+ buffer .clear ();
493+ // buffer.elementsCount = 0;
494+ }
470495 }
471496
472497 private void indexBucket (XContentBuilder doc ) {
@@ -489,6 +514,7 @@ public void preCollection() {
489514 @ Override
490515 public void postCollection () throws IOException {
491516 // Flush downsample doc if not empty
517+ bulkCollection ();
492518 if (downsampleBucketBuilder .isEmpty () == false ) {
493519 XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
494520 indexBucket (doc );
0 commit comments