26
26
import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
27
27
import org .elasticsearch .common .xcontent .XContentHelper ;
28
28
import org .elasticsearch .core .TimeValue ;
29
+ import org .elasticsearch .core .Tuple ;
29
30
import org .elasticsearch .logging .LogManager ;
30
31
import org .elasticsearch .logging .Logger ;
31
32
import org .elasticsearch .script .IngestConditionalScript ;
@@ -147,10 +148,14 @@ private void maybeSample(
147
148
SampleStats stats = sampleInfo .stats ;
148
149
stats .potentialSamples .increment ();
149
150
try {
150
- if (sampleInfo .hasCapacity () == false ) {
151
+ if (sampleInfo .isFull ) {
151
152
stats .samplesRejectedForMaxSamplesExceeded .increment ();
152
153
return ;
153
154
}
155
+ if (sampleInfo .getSizeInBytes () + indexRequest .source ().length () > samplingConfig .maxSize ().getBytes ()) {
156
+ stats .samplesRejectedForSize .increment ();
157
+ return ;
158
+ }
154
159
if (Math .random () >= samplingConfig .rate ()) {
155
160
stats .samplesRejectedForRate .increment ();
156
161
return ;
@@ -306,6 +311,10 @@ public void writeTo(StreamOutput out) throws IOException {
306
311
XContentHelper .writeTo (out , contentType );
307
312
}
308
313
314
+ public long getSizeInBytes () {
315
+ return indexName .length () + source .length ;
316
+ }
317
+
309
318
@ Override
310
319
public boolean equals (Object o ) {
311
320
if (this == o ) return true ;
@@ -346,6 +355,7 @@ public static final class SampleStats implements Writeable, ToXContent {
346
355
final LongAdder samplesRejectedForCondition = new LongAdder ();
347
356
final LongAdder samplesRejectedForRate = new LongAdder ();
348
357
final LongAdder samplesRejectedForException = new LongAdder ();
358
+ final LongAdder samplesRejectedForSize = new LongAdder ();
349
359
final LongAdder timeSamplingInNanos = new LongAdder ();
350
360
final LongAdder timeEvaluatingConditionInNanos = new LongAdder ();
351
361
final LongAdder timeCompilingConditionInNanos = new LongAdder ();
@@ -367,6 +377,7 @@ public SampleStats(
367
377
long samplesRejectedForCondition ,
368
378
long samplesRejectedForRate ,
369
379
long samplesRejectedForException ,
380
+ long samplesRejectedForSize ,
370
381
TimeValue timeSampling ,
371
382
TimeValue timeEvaluatingCondition ,
372
383
TimeValue timeCompilingCondition ,
@@ -378,6 +389,7 @@ public SampleStats(
378
389
this .samplesRejectedForCondition .add (samplesRejectedForCondition );
379
390
this .samplesRejectedForRate .add (samplesRejectedForRate );
380
391
this .samplesRejectedForException .add (samplesRejectedForException );
392
+ this .samplesRejectedForSize .add (samplesRejectedForSize );
381
393
this .timeSamplingInNanos .add (timeSampling .nanos ());
382
394
this .timeEvaluatingConditionInNanos .add (timeEvaluatingCondition .nanos ());
383
395
this .timeCompilingConditionInNanos .add (timeCompilingCondition .nanos ());
@@ -390,6 +402,7 @@ public SampleStats(StreamInput in) throws IOException {
390
402
samplesRejectedForCondition .add (in .readLong ());
391
403
samplesRejectedForRate .add (in .readLong ());
392
404
samplesRejectedForException .add (in .readLong ());
405
+ samplesRejectedForSize .add (in .readLong ());
393
406
samples .add (in .readLong ());
394
407
timeSamplingInNanos .add (in .readLong ());
395
408
timeEvaluatingConditionInNanos .add (in .readLong ());
@@ -425,6 +438,10 @@ public long getSamplesRejectedForException() {
425
438
return samplesRejectedForException .longValue ();
426
439
}
427
440
441
+ public long getSamplesRejectedForSize () {
442
+ return samplesRejectedForSize .longValue ();
443
+ }
444
+
428
445
public TimeValue getTimeSampling () {
429
446
return TimeValue .timeValueNanos (timeSamplingInNanos .longValue ());
430
447
}
@@ -475,6 +492,7 @@ private static void addAllFields(SampleStats source, SampleStats dest) {
475
492
dest .samplesRejectedForCondition .add (source .samplesRejectedForCondition .longValue ());
476
493
dest .samplesRejectedForRate .add (source .samplesRejectedForRate .longValue ());
477
494
dest .samplesRejectedForException .add (source .samplesRejectedForException .longValue ());
495
+ dest .samplesRejectedForSize .add (source .samplesRejectedForSize .longValue ());
478
496
dest .samples .add (source .samples .longValue ());
479
497
dest .timeSamplingInNanos .add (source .timeSamplingInNanos .longValue ());
480
498
dest .timeEvaluatingConditionInNanos .add (source .timeEvaluatingConditionInNanos .longValue ());
@@ -492,6 +510,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
492
510
builder .field ("samples_rejected_for_condition" , samplesRejectedForCondition .longValue ());
493
511
builder .field ("samples_rejected_for_rate" , samplesRejectedForRate .longValue ());
494
512
builder .field ("samples_rejected_for_exception" , samplesRejectedForException .longValue ());
513
+ builder .field ("samples_rejected_for_size" , samplesRejectedForSize .longValue ());
495
514
builder .field ("samples_accepted" , samples .longValue ());
496
515
builder .humanReadableField ("time_sampling_millis" , "time_sampling" , TimeValue .timeValueNanos (timeSamplingInNanos .longValue ()));
497
516
builder .humanReadableField (
@@ -515,6 +534,7 @@ public void writeTo(StreamOutput out) throws IOException {
515
534
out .writeLong (samplesRejectedForCondition .longValue ());
516
535
out .writeLong (samplesRejectedForRate .longValue ());
517
536
out .writeLong (samplesRejectedForException .longValue ());
537
+ out .writeLong (samplesRejectedForSize .longValue ());
518
538
out .writeLong (samples .longValue ());
519
539
out .writeLong (timeSamplingInNanos .longValue ());
520
540
out .writeLong (timeEvaluatingConditionInNanos .longValue ());
@@ -558,6 +578,9 @@ public boolean equals(Object o) {
558
578
if (samplesRejectedForException .longValue () != that .samplesRejectedForException .longValue ()) {
559
579
return false ;
560
580
}
581
+ if (samplesRejectedForSize .longValue () != that .samplesRejectedForSize .longValue ()) {
582
+ return false ;
583
+ }
561
584
if (timeSamplingInNanos .longValue () != that .timeSamplingInNanos .longValue ()) {
562
585
return false ;
563
586
}
@@ -598,6 +621,7 @@ public int hashCode() {
598
621
samplesRejectedForCondition .longValue (),
599
622
samplesRejectedForRate .longValue (),
600
623
samplesRejectedForException .longValue (),
624
+ samplesRejectedForSize .longValue (),
601
625
timeSamplingInNanos .longValue (),
602
626
timeEvaluatingConditionInNanos .longValue (),
603
627
timeCompilingConditionInNanos .longValue ()
@@ -637,14 +661,23 @@ public SampleStats adjustForMaxSize(int maxSize) {
637
661
*/
638
662
private static final class SampleInfo {
639
663
private final RawDocument [] rawDocuments ;
664
+ /*
665
+ * This stores the maximum index in rawDocuments that has data currently. This is incremented speculatively before writing data to
666
+ * the array, so it is possible that this index is rawDocuments.length or greater.
667
+ */
668
+ private final AtomicInteger rawDocumentsIndex = new AtomicInteger (-1 );
669
+ /*
670
+ * This caches the size of all raw documents in the rawDocuments array up to and including the data at the index on the left side
671
+ * of the tuple. The size in bytes is the right side of the tuple.
672
+ */
673
+ private volatile Tuple <Integer , Long > sizeInBytesAtIndex = Tuple .tuple (-1 , 0L );
640
674
private final SampleStats stats ;
641
675
private final long expiration ;
642
676
private final TimeValue timeToLive ;
643
677
private volatile Script script ;
644
678
private volatile IngestConditionalScript .Factory factory ;
645
679
private volatile boolean compilationFailed = false ;
646
680
private volatile boolean isFull = false ;
647
- private final AtomicInteger arrayIndex = new AtomicInteger (0 );
648
681
649
682
SampleInfo (int maxSamples , TimeValue timeToLive , long relativeNowMillis ) {
650
683
this .timeToLive = timeToLive ;
@@ -653,10 +686,6 @@ private static final class SampleInfo {
653
686
this .expiration = (timeToLive == null ? TimeValue .timeValueDays (5 ).millis () : timeToLive .millis ()) + relativeNowMillis ;
654
687
}
655
688
656
- public boolean hasCapacity () {
657
- return isFull == false ;
658
- }
659
-
660
689
/*
661
690
* This returns the array of raw documents. It's size will be the maximum number of raw documents allowed in this sample. Some (or
662
691
* all) elements could be null.
@@ -665,11 +694,55 @@ public RawDocument[] getRawDocuments() {
665
694
return rawDocuments ;
666
695
}
667
696
697
+ /*
698
+ * This gets an approximate size in bytes for this sample. It only takes the size of the raw documents into account, since that is
699
+ * the only part of the sample that is not a fixed size. This method favors speed over 100% correctness -- it is possible during
700
+ * heavy concurrent ingestion that it under-reports the current size.
701
+ */
702
+ public long getSizeInBytes () {
703
+ /*
704
+ * This method could get called very frequently during ingestion. Looping through every RawDocument every time would get
705
+ * expensive. Since the data in the rawDocuments array is immutable once it has been written, we store the index and value of
706
+ * the computed size if all raw documents up to that index are non-null (i.e. no documents were still in flight as we were
707
+ * counting). That way we don't have to re-compute the size for documents we've already looked at.
708
+ */
709
+ Tuple <Integer , Long > knownIndexAndSize = sizeInBytesAtIndex ;
710
+ int knownSizeIndex = knownIndexAndSize .v1 ();
711
+ long knownSize = knownIndexAndSize .v2 ();
712
+ // It is possible that rawDocumentsIndex is beyond the end of rawDocuments
713
+ int currentRawDocumentsIndex = Math .min (rawDocumentsIndex .get (), rawDocuments .length - 1 );
714
+ if (currentRawDocumentsIndex == knownSizeIndex ) {
715
+ return knownSize ;
716
+ }
717
+ long size = knownSize ;
718
+ boolean anyNulls = false ;
719
+ for (int i = knownSizeIndex + 1 ; i <= currentRawDocumentsIndex ; i ++) {
720
+ RawDocument rawDocument = rawDocuments [i ];
721
+ if (rawDocument == null ) {
722
+ /*
723
+ * Some documents were in flight and haven't been stored in the array yet, so we'll move past this. The size will be a
724
+ * little low on this method call. So we're going to set this flag so that we don't store this value for future use.
725
+ */
726
+ anyNulls = true ;
727
+ } else {
728
+ size += rawDocuments [i ].getSizeInBytes ();
729
+ }
730
+ }
731
+ /*
732
+ * The most important thing is for this method to be fast. It is OK if we store the same value twice, or even if we store a
733
+ * slightly out-of-date copy, as long as we don't do any locking. The correct size will be calculated next time.
734
+ */
735
+ if (anyNulls == false ) {
736
+ sizeInBytesAtIndex = Tuple .tuple (currentRawDocumentsIndex , size );
737
+ }
738
+ return size ;
739
+ }
740
+
668
741
/*
669
742
* Adds the rawDocument to the sample if there is capacity. Returns true if it adds it, or false if it does not.
670
743
*/
671
744
public boolean offer (RawDocument rawDocument ) {
672
- int index = arrayIndex . getAndIncrement ();
745
+ int index = rawDocumentsIndex . incrementAndGet ();
673
746
if (index < rawDocuments .length ) {
674
747
rawDocuments [index ] = rawDocument ;
675
748
if (index == rawDocuments .length - 1 ) {
0 commit comments