diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 800b760879769..d1cf0a1ce0477 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.script.IngestConditionalScript; @@ -147,10 +148,14 @@ private void maybeSample( SampleStats stats = sampleInfo.stats; stats.potentialSamples.increment(); try { - if (sampleInfo.hasCapacity() == false) { + if (sampleInfo.isFull) { stats.samplesRejectedForMaxSamplesExceeded.increment(); return; } + if (sampleInfo.getSizeInBytes() + indexRequest.source().length() > samplingConfig.maxSize().getBytes()) { + stats.samplesRejectedForSize.increment(); + return; + } if (Math.random() >= samplingConfig.rate()) { stats.samplesRejectedForRate.increment(); return; @@ -306,6 +311,10 @@ public void writeTo(StreamOutput out) throws IOException { XContentHelper.writeTo(out, contentType); } + public long getSizeInBytes() { + return indexName.length() + source.length; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -346,6 +355,7 @@ public static final class SampleStats implements Writeable, ToXContent { final LongAdder samplesRejectedForCondition = new LongAdder(); final LongAdder samplesRejectedForRate = new LongAdder(); final LongAdder samplesRejectedForException = new LongAdder(); + final LongAdder samplesRejectedForSize = new LongAdder(); final LongAdder timeSamplingInNanos = new LongAdder(); final LongAdder timeEvaluatingConditionInNanos = new LongAdder(); final LongAdder timeCompilingConditionInNanos = new LongAdder(); @@ -367,6 +377,7 @@ public SampleStats( long samplesRejectedForCondition, long samplesRejectedForRate, long samplesRejectedForException, + long samplesRejectedForSize, TimeValue timeSampling, TimeValue timeEvaluatingCondition, TimeValue timeCompilingCondition, @@ -378,6 +389,7 @@ public SampleStats( this.samplesRejectedForCondition.add(samplesRejectedForCondition); this.samplesRejectedForRate.add(samplesRejectedForRate); this.samplesRejectedForException.add(samplesRejectedForException); + this.samplesRejectedForSize.add(samplesRejectedForSize); this.timeSamplingInNanos.add(timeSampling.nanos()); this.timeEvaluatingConditionInNanos.add(timeEvaluatingCondition.nanos()); this.timeCompilingConditionInNanos.add(timeCompilingCondition.nanos()); @@ -390,6 +402,7 @@ public SampleStats(StreamInput in) throws IOException { samplesRejectedForCondition.add(in.readLong()); samplesRejectedForRate.add(in.readLong()); samplesRejectedForException.add(in.readLong()); + samplesRejectedForSize.add(in.readLong()); samples.add(in.readLong()); timeSamplingInNanos.add(in.readLong()); timeEvaluatingConditionInNanos.add(in.readLong()); @@ -425,6 +438,10 @@ public long getSamplesRejectedForException() { return samplesRejectedForException.longValue(); } + public long getSamplesRejectedForSize() { + return samplesRejectedForSize.longValue(); + } + public TimeValue getTimeSampling() { return TimeValue.timeValueNanos(timeSamplingInNanos.longValue()); } @@ -475,6 +492,7 @@ private static void addAllFields(SampleStats source, SampleStats dest) { dest.samplesRejectedForCondition.add(source.samplesRejectedForCondition.longValue()); dest.samplesRejectedForRate.add(source.samplesRejectedForRate.longValue()); dest.samplesRejectedForException.add(source.samplesRejectedForException.longValue()); + dest.samplesRejectedForSize.add(source.samplesRejectedForSize.longValue()); dest.samples.add(source.samples.longValue()); dest.timeSamplingInNanos.add(source.timeSamplingInNanos.longValue()); dest.timeEvaluatingConditionInNanos.add(source.timeEvaluatingConditionInNanos.longValue()); @@ -492,6 +510,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field("samples_rejected_for_condition", samplesRejectedForCondition.longValue()); builder.field("samples_rejected_for_rate", samplesRejectedForRate.longValue()); builder.field("samples_rejected_for_exception", samplesRejectedForException.longValue()); + builder.field("samples_rejected_for_size", samplesRejectedForSize.longValue()); builder.field("samples_accepted", samples.longValue()); builder.humanReadableField("time_sampling_millis", "time_sampling", TimeValue.timeValueNanos(timeSamplingInNanos.longValue())); builder.humanReadableField( @@ -515,6 +534,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(samplesRejectedForCondition.longValue()); out.writeLong(samplesRejectedForRate.longValue()); out.writeLong(samplesRejectedForException.longValue()); + out.writeLong(samplesRejectedForSize.longValue()); out.writeLong(samples.longValue()); out.writeLong(timeSamplingInNanos.longValue()); out.writeLong(timeEvaluatingConditionInNanos.longValue()); @@ -558,6 +578,9 @@ public boolean equals(Object o) { if (samplesRejectedForException.longValue() != that.samplesRejectedForException.longValue()) { return false; } + if (samplesRejectedForSize.longValue() != that.samplesRejectedForSize.longValue()) { + return false; + } if (timeSamplingInNanos.longValue() != that.timeSamplingInNanos.longValue()) { return false; } @@ -598,6 +621,7 @@ public int hashCode() { samplesRejectedForCondition.longValue(), samplesRejectedForRate.longValue(), samplesRejectedForException.longValue(), + samplesRejectedForSize.longValue(), timeSamplingInNanos.longValue(), timeEvaluatingConditionInNanos.longValue(), timeCompilingConditionInNanos.longValue() @@ -637,6 +661,16 @@ public SampleStats adjustForMaxSize(int maxSize) { */ private static final class SampleInfo { private final RawDocument[] rawDocuments; + /* + * This stores the maximum index in rawDocuments that has data currently. This is incremented speculatively before writing data to + * the array, so it is possible that this index is rawDocuments.length or greater. + */ + private final AtomicInteger rawDocumentsIndex = new AtomicInteger(-1); + /* + * This caches the size of all raw documents in the rawDocuments array up to and including the data at the index on the left side + * of the tuple. The size in bytes is the right side of the tuple. + */ + private volatile Tuple sizeInBytesAtIndex = Tuple.tuple(-1, 0L); private final SampleStats stats; private final long expiration; private final TimeValue timeToLive; @@ -644,7 +678,6 @@ private static final class SampleInfo { private volatile IngestConditionalScript.Factory factory; private volatile boolean compilationFailed = false; private volatile boolean isFull = false; - private final AtomicInteger arrayIndex = new AtomicInteger(0); SampleInfo(int maxSamples, TimeValue timeToLive, long relativeNowMillis) { this.timeToLive = timeToLive; @@ -653,10 +686,6 @@ private static final class SampleInfo { this.expiration = (timeToLive == null ? TimeValue.timeValueDays(5).millis() : timeToLive.millis()) + relativeNowMillis; } - public boolean hasCapacity() { - return isFull == false; - } - /* * This returns the array of raw documents. It's size will be the maximum number of raw documents allowed in this sample. Some (or * all) elements could be null. @@ -665,11 +694,55 @@ public RawDocument[] getRawDocuments() { return rawDocuments; } + /* + * This gets an approximate size in bytes for this sample. It only takes the size of the raw documents into account, since that is + * the only part of the sample that is not a fixed size. This method favors speed over 100% correctness -- it is possible during + * heavy concurrent ingestion that it under-reports the current size. + */ + public long getSizeInBytes() { + /* + * This method could get called very frequently during ingestion. Looping through every RawDocument every time would get + * expensive. Since the data in the rawDocuments array is immutable once it has been written, we store the index and value of + * the computed size if all raw documents up to that index are non-null (i.e. no documents were still in flight as we were + * counting). That way we don't have to re-compute the size for documents we've already looked at. + */ + Tuple knownIndexAndSize = sizeInBytesAtIndex; + int knownSizeIndex = knownIndexAndSize.v1(); + long knownSize = knownIndexAndSize.v2(); + // It is possible that rawDocumentsIndex is beyond the end of rawDocuments + int currentRawDocumentsIndex = Math.min(rawDocumentsIndex.get(), rawDocuments.length - 1); + if (currentRawDocumentsIndex == knownSizeIndex) { + return knownSize; + } + long size = knownSize; + boolean anyNulls = false; + for (int i = knownSizeIndex + 1; i <= currentRawDocumentsIndex; i++) { + RawDocument rawDocument = rawDocuments[i]; + if (rawDocument == null) { + /* + * Some documents were in flight and haven't been stored in the array yet, so we'll move past this. The size will be a + * little low on this method call. So we're going to set this flag so that we don't store this value for future use. + */ + anyNulls = true; + } else { + size += rawDocuments[i].getSizeInBytes(); + } + } + /* + * The most important thing is for this method to be fast. It is OK if we store the same value twice, or even if we store a + * slightly out-of-date copy, as long as we don't do any locking. The correct size will be calculated next time. + */ + if (anyNulls == false) { + sizeInBytesAtIndex = Tuple.tuple(currentRawDocumentsIndex, size); + } + return size; + } + /* * Adds the rawDocument to the sample if there is capacity. Returns true if it adds it, or false if it does not. */ public boolean offer(RawDocument rawDocument) { - int index = arrayIndex.getAndIncrement(); + int index = rawDocumentsIndex.incrementAndGet(); if (index < rawDocuments.length) { rawDocuments[index] = rawDocument; if (index == rawDocuments.length - 1) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java index 620a15bd6e4d0..4f4198700baff 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java @@ -52,6 +52,7 @@ private static SamplingService.SampleStats randomStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomPositiveTimeValue(), randomPositiveTimeValue(), randomPositiveTimeValue(), diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java index ddaf19195d563..0858495d14bac 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java @@ -67,6 +67,7 @@ private static SamplingService.SampleStats randomStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomPositiveTimeValue(), randomPositiveTimeValue(), randomPositiveTimeValue(), diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java index 6fb3d345a3caa..cf8cfecf3aab9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java @@ -36,6 +36,7 @@ protected SampleStats createTestInstance() { stats.samplesRejectedForCondition.add(randomReasonableLong()); stats.samplesRejectedForRate.add(randomReasonableLong()); stats.samplesRejectedForException.add(randomReasonableLong()); + stats.samplesRejectedForSize.add(randomReasonableLong()); stats.timeSamplingInNanos.add(randomReasonableLong()); stats.timeEvaluatingConditionInNanos.add(randomReasonableLong()); stats.timeCompilingConditionInNanos.add(randomReasonableLong()); @@ -58,17 +59,18 @@ private long randomReasonableLong() { @Override protected SampleStats mutateInstance(SampleStats instance) throws IOException { SampleStats mutated = instance.combine(new SampleStats()); - switch (between(0, 9)) { + switch (between(0, 10)) { case 0 -> mutated.samples.add(1); case 1 -> mutated.potentialSamples.add(1); case 2 -> mutated.samplesRejectedForMaxSamplesExceeded.add(1); case 3 -> mutated.samplesRejectedForCondition.add(1); case 4 -> mutated.samplesRejectedForRate.add(1); case 5 -> mutated.samplesRejectedForException.add(1); - case 6 -> mutated.timeSamplingInNanos.add(1); - case 7 -> mutated.timeEvaluatingConditionInNanos.add(1); - case 8 -> mutated.timeCompilingConditionInNanos.add(1); - case 9 -> mutated.lastException = mutated.lastException == null + case 6 -> mutated.samplesRejectedForSize.add(1); + case 7 -> mutated.timeSamplingInNanos.add(1); + case 8 -> mutated.timeEvaluatingConditionInNanos.add(1); + case 9 -> mutated.timeCompilingConditionInNanos.add(1); + case 10 -> mutated.lastException = mutated.lastException == null ? new ElasticsearchException(randomAlphanumericOfLength(10)) : null; default -> throw new IllegalArgumentException("Should never get here"); @@ -104,6 +106,10 @@ public void testCombine() { stats1CombineStats2.getSamplesRejectedForException(), equalTo(stats1.getSamplesRejectedForException() + stats2.getSamplesRejectedForException()) ); + assertThat( + stats1CombineStats2.getSamplesRejectedForSize(), + equalTo(stats1.getSamplesRejectedForSize() + stats2.getSamplesRejectedForSize()) + ); assertThat( stats1CombineStats2.getTimeSampling(), equalTo(TimeValue.timeValueNanos(stats1.getTimeSampling().nanos() + stats2.getTimeSampling().nanos())) diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java index d430fac600e41..d15874dcf7942 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentType; import java.util.HashMap; import java.util.List; @@ -96,6 +97,7 @@ public void testMaybeSample() { assertThat(stats.getSamplesRejectedForRate(), equalTo(0L)); assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L)); assertThat(stats.getSamplesRejectedForException(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForSize(), equalTo(0L)); assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo(0L)); assertThat(stats.getLastException(), nullValue()); assertThat(stats.getTimeSampling(), greaterThan(TimeValue.ZERO)); @@ -224,6 +226,36 @@ public void testMaybeSampleMaxSamples() { assertThat(stats.getTimeEvaluatingCondition(), equalTo(TimeValue.ZERO)); } + public void testMaybeSampleMaxSize() { + /* + * This tests that the max size limit on the SamplingConfiguration is enforced. Here we set maxSize to 400. The source field of + * each index request is an array of 150 bytes. Since the size of the raw document is approximately the size of the source byte + * array, we expect to be able to insert 2 raw documents before all others are rejected due to the max size limit. + */ + assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG); + SamplingService samplingService = getTestSamplingService(); + String indexName = randomIdentifier(); + int maxSamples = randomIntBetween(2, 50); + ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(ProjectId.DEFAULT) + .putCustom( + SamplingMetadata.TYPE, + new SamplingMetadata( + Map.of( + indexName, + new SamplingConfiguration(1.0, maxSamples, ByteSizeValue.ofBytes(400), TimeValue.timeValueDays(3), null) + ) + ) + ); + final ProjectId projectId = projectBuilder.getId(); + ProjectMetadata projectMetadata = projectBuilder.build(); + final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(randomByteArrayOfLength(150), XContentType.JSON); + for (int i = 0; i < maxSamples; i++) { + samplingService.maybeSample(projectMetadata, indexRequest); + } + assertThat(samplingService.getLocalSample(projectId, indexName).size(), equalTo(2)); + assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2)); + } + private SamplingService getTestSamplingService() { final ScriptService scriptService = new ScriptService( Settings.EMPTY,