1010import org .apache .logging .log4j .Logger ;
1111import org .apache .lucene .document .SortedSetDocValuesField ;
1212import org .apache .lucene .index .LeafReaderContext ;
13+ import org .apache .lucene .internal .hppc .IntArrayList ;
1314import org .apache .lucene .search .MatchAllDocsQuery ;
1415import org .apache .lucene .search .MatchNoDocsQuery ;
1516import org .apache .lucene .search .Query ;
8081class DownsampleShardIndexer {
8182
8283 private static final Logger logger = LogManager .getLogger (DownsampleShardIndexer .class );
84+ private static final int DOCID_BUFFER_SIZE = 8096 ;
8385 public static final int DOWNSAMPLE_BULK_ACTIONS = 10000 ;
8486 public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = new ByteSizeValue (1 , ByteSizeUnit .MB );
8587 public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = new ByteSizeValue (50 , ByteSizeUnit .MB );
@@ -338,6 +340,7 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
338340 private class TimeSeriesBucketCollector extends BucketCollector {
339341 private final BulkProcessor2 bulkProcessor ;
340342 private final DownsampleBucketBuilder downsampleBucketBuilder ;
343+ private final List <LeafDownsampleCollector > leafBucketCollectors = new ArrayList <>();
341344 private long docsProcessed ;
342345 private long bucketsCreated ;
343346 long lastTimestamp = Long .MAX_VALUE ;
@@ -365,83 +368,138 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
365368 formattedDocValues [i ] = fieldValueFetchers .get (i ).getLeaf (ctx );
366369 }
367370
368- return new LeafBucketCollector () {
369- @ Override
370- public void collect (int docId , long owningBucketOrd ) throws IOException {
371- task .addNumReceived (1 );
372- final BytesRef tsidHash = aggCtx .getTsidHash ();
373- assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
374- final int tsidHashOrd = aggCtx .getTsidHashOrd ();
375- final long timestamp = timestampField .resolution ().roundDownToMillis (aggCtx .getTimestamp ());
376-
377- boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder .tsidOrd ();
378- if (tsidChanged || timestamp < lastHistoTimestamp ) {
379- lastHistoTimestamp = Math .max (
380- rounding .round (timestamp ),
381- searchExecutionContext .getIndexSettings ().getTimestampBounds ().startTime ()
382- );
383- }
384- task .setLastSourceTimestamp (timestamp );
385- task .setLastTargetTimestamp (lastHistoTimestamp );
386-
387- if (logger .isTraceEnabled ()) {
388- logger .trace (
389- "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]" ,
390- docId ,
391- DocValueFormat .TIME_SERIES_ID .format (tsidHash ),
392- timestampFormat .format (timestamp ),
393- timestampFormat .format (lastHistoTimestamp )
394- );
395- }
371+ var leafBucketCollector = new LeafDownsampleCollector (aggCtx , docCountProvider , fieldProducers , formattedDocValues );
372+ leafBucketCollectors .add (leafBucketCollector );
373+ return leafBucketCollector ;
374+ }
375+
376+ void bulkCollection () throws IOException {
377+ // The leaf bucket collectors with newer timestamp go first, to correctly capture the last value for counters and labels.
378+ leafBucketCollectors .sort ((o1 , o2 ) -> -Long .compare (o1 .firstTimeStampForBulkCollection , o2 .firstTimeStampForBulkCollection ));
379+ for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors ) {
380+ leafBucketCollector .leafBulkCollection ();
381+ }
382+ }
396383
397- /*
398- * Sanity checks to ensure that we receive documents in the correct order
399- * - _tsid must be sorted in ascending order
400- * - @timestamp must be sorted in descending order within the same _tsid
401- */
402- BytesRef lastTsid = downsampleBucketBuilder .tsid ();
403- assert lastTsid == null || lastTsid .compareTo (tsidHash ) <= 0
404- : "_tsid is not sorted in ascending order: ["
405- + DocValueFormat .TIME_SERIES_ID .format (lastTsid )
406- + "] -> ["
407- + DocValueFormat .TIME_SERIES_ID .format (tsidHash )
408- + "]" ;
409- assert tsidHash .equals (lastTsid ) == false || lastTimestamp >= timestamp
410- : "@timestamp is not sorted in descending order: ["
411- + timestampFormat .format (lastTimestamp )
412- + "] -> ["
413- + timestampFormat .format (timestamp )
414- + "]" ;
415- lastTimestamp = timestamp ;
416-
417- if (tsidChanged || downsampleBucketBuilder .timestamp () != lastHistoTimestamp ) {
418- // Flush downsample doc if not empty
419- if (downsampleBucketBuilder .isEmpty () == false ) {
420- XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
421- indexBucket (doc );
422- }
423-
424- // Create new downsample bucket
425- if (tsidChanged ) {
426- downsampleBucketBuilder .resetTsid (tsidHash , tsidHashOrd , lastHistoTimestamp );
427- } else {
428- downsampleBucketBuilder .resetTimestamp (lastHistoTimestamp );
429- }
430- bucketsCreated ++;
384+ class LeafDownsampleCollector extends LeafBucketCollector {
385+
386+ final AggregationExecutionContext aggCtx ;
387+ final DocCountProvider docCountProvider ;
388+ final FormattedDocValues [] formattedDocValues ;
389+ final AbstractDownsampleFieldProducer [] fieldProducers ;
390+
391+ // Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
392+ long firstTimeStampForBulkCollection ;
393+ final IntArrayList docIdBuffer = new IntArrayList (DOCID_BUFFER_SIZE );
394+ final long timestampBoundStartTime = searchExecutionContext .getIndexSettings ().getTimestampBounds ().startTime ();
395+
396+ LeafDownsampleCollector (
397+ AggregationExecutionContext aggCtx ,
398+ DocCountProvider docCountProvider ,
399+ AbstractDownsampleFieldProducer [] fieldProducers ,
400+ FormattedDocValues [] formattedDocValues
401+ ) {
402+ this .aggCtx = aggCtx ;
403+ this .docCountProvider = docCountProvider ;
404+ this .fieldProducers = fieldProducers ;
405+ this .formattedDocValues = formattedDocValues ;
406+ }
407+
408+ @ Override
409+ public void collect (int docId , long owningBucketOrd ) throws IOException {
410+ task .addNumReceived (1 );
411+ final BytesRef tsidHash = aggCtx .getTsidHash ();
412+ assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
413+ final int tsidHashOrd = aggCtx .getTsidHashOrd ();
414+ final long timestamp = timestampField .resolution ().roundDownToMillis (aggCtx .getTimestamp ());
415+
416+ boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder .tsidOrd ();
417+ if (tsidChanged || timestamp < lastHistoTimestamp ) {
418+ lastHistoTimestamp = Math .max (rounding .round (timestamp ), timestampBoundStartTime );
419+ }
420+ task .setLastSourceTimestamp (timestamp );
421+ task .setLastTargetTimestamp (lastHistoTimestamp );
422+
423+ if (logger .isTraceEnabled ()) {
424+ logger .trace (
425+ "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]" ,
426+ docId ,
427+ DocValueFormat .TIME_SERIES_ID .format (tsidHash ),
428+ timestampFormat .format (timestamp ),
429+ timestampFormat .format (lastHistoTimestamp )
430+ );
431+ }
432+
433+ /*
434+ * Sanity checks to ensure that we receive documents in the correct order
435+ * - _tsid must be sorted in ascending order
436+ * - @timestamp must be sorted in descending order within the same _tsid
437+ */
438+ BytesRef lastTsid = downsampleBucketBuilder .tsid ();
439+ assert lastTsid == null || lastTsid .compareTo (tsidHash ) <= 0
440+ : "_tsid is not sorted in ascending order: ["
441+ + DocValueFormat .TIME_SERIES_ID .format (lastTsid )
442+ + "] -> ["
443+ + DocValueFormat .TIME_SERIES_ID .format (tsidHash )
444+ + "]" ;
445+ assert tsidHash .equals (lastTsid ) == false || lastTimestamp >= timestamp
446+ : "@timestamp is not sorted in descending order: ["
447+ + timestampFormat .format (lastTimestamp )
448+ + "] -> ["
449+ + timestampFormat .format (timestamp )
450+ + "]" ;
451+ lastTimestamp = timestamp ;
452+
453+ if (tsidChanged || downsampleBucketBuilder .timestamp () != lastHistoTimestamp ) {
454+ bulkCollection ();
455+ // Flush downsample doc if not empty
456+ if (downsampleBucketBuilder .isEmpty () == false ) {
457+ XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
458+ indexBucket (doc );
431459 }
432460
433- final int docCount = docCountProvider .getDocCount (docId );
434- downsampleBucketBuilder .collectDocCount (docCount );
435- // Iterate over all field values and collect the doc_values for this docId
436- for (int i = 0 ; i < fieldProducers .length ; i ++) {
437- AbstractDownsampleFieldProducer fieldProducer = fieldProducers [i ];
438- FormattedDocValues docValues = formattedDocValues [i ];
439- fieldProducer .collect (docValues , docId );
461+ // Create new downsample bucket
462+ if (tsidChanged ) {
463+ downsampleBucketBuilder .resetTsid (tsidHash , tsidHashOrd , lastHistoTimestamp );
464+ } else {
465+ downsampleBucketBuilder .resetTimestamp (lastHistoTimestamp );
440466 }
441- docsProcessed ++;
442- task .setDocsProcessed (docsProcessed );
467+ bucketsCreated ++;
443468 }
444- };
469+
470+ if (docIdBuffer .isEmpty ()) {
471+ firstTimeStampForBulkCollection = aggCtx .getTimestamp ();
472+ }
473+ // buffer.add() always delegates to system.arraycopy() and checks buffer size for resizing purposes:
474+ docIdBuffer .buffer [docIdBuffer .elementsCount ++] = docId ;
475+ if (docIdBuffer .size () == DOCID_BUFFER_SIZE ) {
476+ bulkCollection ();
477+ }
478+ }
479+
480+ void leafBulkCollection () throws IOException {
481+ if (docIdBuffer .isEmpty ()) {
482+ return ;
483+ }
484+
485+ if (logger .isDebugEnabled ()) {
486+ logger .debug ("buffered {} docids" , docIdBuffer .size ());
487+ }
488+
489+ downsampleBucketBuilder .collectDocCount (docIdBuffer , docCountProvider );
490+ // Iterate over all field values and collect the doc_values for this docId
491+ for (int i = 0 ; i < fieldProducers .length ; i ++) {
492+ AbstractDownsampleFieldProducer fieldProducer = fieldProducers [i ];
493+ FormattedDocValues docValues = formattedDocValues [i ];
494+ fieldProducer .collect (docValues , docIdBuffer );
495+ }
496+
497+ docsProcessed += docIdBuffer .size ();
498+ task .setDocsProcessed (docsProcessed );
499+
500+ // buffer.clean() also overwrites all slots with zeros
501+ docIdBuffer .elementsCount = 0 ;
502+ }
445503 }
446504
447505 private void indexBucket (XContentBuilder doc ) {
@@ -464,6 +522,7 @@ public void preCollection() {
464522 @ Override
465523 public void postCollection () throws IOException {
466524 // Flush downsample doc if not empty
525+ bulkCollection ();
467526 if (downsampleBucketBuilder .isEmpty () == false ) {
468527 XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
469528 indexBucket (doc );
@@ -545,8 +604,15 @@ public void resetTimestamp(long timestamp) {
545604 }
546605 }
547606
548- public void collectDocCount (int docCount ) {
549- this .docCount += docCount ;
607+ public void collectDocCount (IntArrayList buffer , DocCountProvider docCountProvider ) throws IOException {
608+ if (docCountProvider .alwaysOne ()) {
609+ this .docCount += buffer .size ();
610+ } else {
611+ for (int i = 0 ; i < buffer .size (); i ++) {
612+ int docId = buffer .get (i );
613+ this .docCount += docCountProvider .getDocCount (docId );
614+ }
615+ }
550616 }
551617
552618 public XContentBuilder buildDownsampleDocument () throws IOException {
0 commit comments