Skip to content
73 changes: 68 additions & 5 deletions server/src/main/java/org/elasticsearch/ingest/SamplingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.script.IngestConditionalScript;
Expand Down Expand Up @@ -147,10 +148,14 @@ private void maybeSample(
SampleStats stats = sampleInfo.stats;
stats.potentialSamples.increment();
try {
if (sampleInfo.hasCapacity() == false) {
if (sampleInfo.isFull) {
stats.samplesRejectedForMaxSamplesExceeded.increment();
return;
}
if (sampleInfo.getSizeInBytes() + indexRequest.source().length() > samplingConfig.maxSize().getBytes()) {
stats.samplesRejectedForSize.increment();
return;
}
if (Math.random() >= samplingConfig.rate()) {
stats.samplesRejectedForRate.increment();
return;
Expand Down Expand Up @@ -306,6 +311,10 @@ public void writeTo(StreamOutput out) throws IOException {
XContentHelper.writeTo(out, contentType);
}

public long getSizeInBytes() {
return indexName.length() + source.length;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -346,6 +355,7 @@ public static final class SampleStats implements Writeable, ToXContent {
final LongAdder samplesRejectedForCondition = new LongAdder();
final LongAdder samplesRejectedForRate = new LongAdder();
final LongAdder samplesRejectedForException = new LongAdder();
final LongAdder samplesRejectedForSize = new LongAdder();
final LongAdder timeSamplingInNanos = new LongAdder();
final LongAdder timeEvaluatingConditionInNanos = new LongAdder();
final LongAdder timeCompilingConditionInNanos = new LongAdder();
Expand All @@ -367,6 +377,7 @@ public SampleStats(
long samplesRejectedForCondition,
long samplesRejectedForRate,
long samplesRejectedForException,
long samplesRejectedForSize,
TimeValue timeSampling,
TimeValue timeEvaluatingCondition,
TimeValue timeCompilingCondition,
Expand All @@ -378,6 +389,7 @@ public SampleStats(
this.samplesRejectedForCondition.add(samplesRejectedForCondition);
this.samplesRejectedForRate.add(samplesRejectedForRate);
this.samplesRejectedForException.add(samplesRejectedForException);
this.samplesRejectedForSize.add(samplesRejectedForSize);
this.timeSamplingInNanos.add(timeSampling.nanos());
this.timeEvaluatingConditionInNanos.add(timeEvaluatingCondition.nanos());
this.timeCompilingConditionInNanos.add(timeCompilingCondition.nanos());
Expand All @@ -390,6 +402,7 @@ public SampleStats(StreamInput in) throws IOException {
samplesRejectedForCondition.add(in.readLong());
samplesRejectedForRate.add(in.readLong());
samplesRejectedForException.add(in.readLong());
samplesRejectedForSize.add(in.readLong());
samples.add(in.readLong());
timeSamplingInNanos.add(in.readLong());
timeEvaluatingConditionInNanos.add(in.readLong());
Expand Down Expand Up @@ -425,6 +438,10 @@ public long getSamplesRejectedForException() {
return samplesRejectedForException.longValue();
}

public long getSamplesRejectedForSize() {
return samplesRejectedForSize.longValue();
}

public TimeValue getTimeSampling() {
return TimeValue.timeValueNanos(timeSamplingInNanos.longValue());
}
Expand Down Expand Up @@ -475,6 +492,7 @@ private static void addAllFields(SampleStats source, SampleStats dest) {
dest.samplesRejectedForCondition.add(source.samplesRejectedForCondition.longValue());
dest.samplesRejectedForRate.add(source.samplesRejectedForRate.longValue());
dest.samplesRejectedForException.add(source.samplesRejectedForException.longValue());
dest.samplesRejectedForSize.add(source.samplesRejectedForSize.longValue());
dest.samples.add(source.samples.longValue());
dest.timeSamplingInNanos.add(source.timeSamplingInNanos.longValue());
dest.timeEvaluatingConditionInNanos.add(source.timeEvaluatingConditionInNanos.longValue());
Expand All @@ -492,6 +510,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.field("samples_rejected_for_condition", samplesRejectedForCondition.longValue());
builder.field("samples_rejected_for_rate", samplesRejectedForRate.longValue());
builder.field("samples_rejected_for_exception", samplesRejectedForException.longValue());
builder.field("samples_rejected_for_size", samplesRejectedForSize.longValue());
builder.field("samples_accepted", samples.longValue());
builder.humanReadableField("time_sampling_millis", "time_sampling", TimeValue.timeValueNanos(timeSamplingInNanos.longValue()));
builder.humanReadableField(
Expand All @@ -515,6 +534,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(samplesRejectedForCondition.longValue());
out.writeLong(samplesRejectedForRate.longValue());
out.writeLong(samplesRejectedForException.longValue());
out.writeLong(samplesRejectedForSize.longValue());
out.writeLong(samples.longValue());
out.writeLong(timeSamplingInNanos.longValue());
out.writeLong(timeEvaluatingConditionInNanos.longValue());
Expand Down Expand Up @@ -558,6 +578,9 @@ public boolean equals(Object o) {
if (samplesRejectedForException.longValue() != that.samplesRejectedForException.longValue()) {
return false;
}
if (samplesRejectedForSize.longValue() != that.samplesRejectedForSize.longValue()) {
return false;
}
if (timeSamplingInNanos.longValue() != that.timeSamplingInNanos.longValue()) {
return false;
}
Expand Down Expand Up @@ -598,6 +621,7 @@ public int hashCode() {
samplesRejectedForCondition.longValue(),
samplesRejectedForRate.longValue(),
samplesRejectedForException.longValue(),
samplesRejectedForSize.longValue(),
timeSamplingInNanos.longValue(),
timeEvaluatingConditionInNanos.longValue(),
timeCompilingConditionInNanos.longValue()
Expand Down Expand Up @@ -644,6 +668,7 @@ private static final class SampleInfo {
private volatile IngestConditionalScript.Factory factory;
private volatile boolean compilationFailed = false;
private volatile boolean isFull = false;
private volatile Tuple<Integer, Long> sizeInBytesAtIndex = Tuple.tuple(-1, 0L);
private final AtomicInteger arrayIndex = new AtomicInteger(0);

SampleInfo(int maxSamples, TimeValue timeToLive, long relativeNowMillis) {
Expand All @@ -653,10 +678,6 @@ private static final class SampleInfo {
this.expiration = (timeToLive == null ? TimeValue.timeValueDays(5).millis() : timeToLive.millis()) + relativeNowMillis;
}

public boolean hasCapacity() {
return isFull == false;
}

/*
* This returns the array of raw documents. It's size will be the maximum number of raw documents allowed in this sample. Some (or
* all) elements could be null.
Expand All @@ -665,6 +686,48 @@ public RawDocument[] getRawDocuments() {
return rawDocuments;
}

/*
* This gets an approximate size in bytes for ths sample. In only takes the size of the raw documents into account, since that is
* the only part of the sample that is not a fixed size.
*/
public long getSizeInBytes() {
/*
* This method could get called very frequently during ingestion. Looping through every RawDocument every time would get
* expensive. Since the data in the rawDocuments array is immutable once it has been written, we store the index and value of
* the computed size if all raw documents up to that index are non-null (i.e. no documents were still in flight as we were
* counting). That way we don't have to re-compute the size for documents we've already looked at.
*/
Tuple<Integer, Long> knownIndexAndSize = sizeInBytesAtIndex;
int knownIndex = knownIndexAndSize.v1();
long knownSize = knownIndexAndSize.v2();
int nextInsertionIndex = arrayIndex.get(); // The value in arrayIndex is always the _next_ insertion point
if (nextInsertionIndex - 1 == knownIndex) {
return knownSize;
}
long size = knownSize;
boolean anyNulls = false;
for (int i = knownIndex + 1; i < nextInsertionIndex; i++) {
RawDocument rawDocument = rawDocuments[i];
if (rawDocument == null) {
/*
* Some documents were in flight and haven't been stored in the array yet, so we'll move past this. The size will be a
* little low on this method call. So we're going to set this flag so that we don't store this value for future use.
*/
anyNulls = true;
} else {
size += rawDocuments[i].getSizeInBytes();
}
}
/*
* The most important thing is for this method to be fast. It is OK if we store the same value twice, or even if we store a
* slightly out-of-date copy, as long as we don't do any locking.
*/
if (anyNulls == false && sizeInBytesAtIndex.v1() + 1 < nextInsertionIndex) {
sizeInBytesAtIndex = Tuple.tuple(nextInsertionIndex - 1, size);
}
return size;
}

/*
* Adds the rawDocument to the sample if there is capacity. Returns true if it adds it, or false if it does not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private static SamplingService.SampleStats randomStats() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomPositiveTimeValue(),
randomPositiveTimeValue(),
randomPositiveTimeValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private static SamplingService.SampleStats randomStats() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomPositiveTimeValue(),
randomPositiveTimeValue(),
randomPositiveTimeValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected SampleStats createTestInstance() {
stats.samplesRejectedForCondition.add(randomReasonableLong());
stats.samplesRejectedForRate.add(randomReasonableLong());
stats.samplesRejectedForException.add(randomReasonableLong());
stats.samplesRejectedForSize.add(randomReasonableLong());
stats.timeSamplingInNanos.add(randomReasonableLong());
stats.timeEvaluatingConditionInNanos.add(randomReasonableLong());
stats.timeCompilingConditionInNanos.add(randomReasonableLong());
Expand All @@ -58,17 +59,18 @@ private long randomReasonableLong() {
@Override
protected SampleStats mutateInstance(SampleStats instance) throws IOException {
SampleStats mutated = instance.combine(new SampleStats());
switch (between(0, 9)) {
switch (between(0, 10)) {
case 0 -> mutated.samples.add(1);
case 1 -> mutated.potentialSamples.add(1);
case 2 -> mutated.samplesRejectedForMaxSamplesExceeded.add(1);
case 3 -> mutated.samplesRejectedForCondition.add(1);
case 4 -> mutated.samplesRejectedForRate.add(1);
case 5 -> mutated.samplesRejectedForException.add(1);
case 6 -> mutated.timeSamplingInNanos.add(1);
case 7 -> mutated.timeEvaluatingConditionInNanos.add(1);
case 8 -> mutated.timeCompilingConditionInNanos.add(1);
case 9 -> mutated.lastException = mutated.lastException == null
case 6 -> mutated.samplesRejectedForSize.add(1);
case 7 -> mutated.timeSamplingInNanos.add(1);
case 8 -> mutated.timeEvaluatingConditionInNanos.add(1);
case 9 -> mutated.timeCompilingConditionInNanos.add(1);
case 10 -> mutated.lastException = mutated.lastException == null
? new ElasticsearchException(randomAlphanumericOfLength(10))
: null;
default -> throw new IllegalArgumentException("Should never get here");
Expand Down Expand Up @@ -104,6 +106,10 @@ public void testCombine() {
stats1CombineStats2.getSamplesRejectedForException(),
equalTo(stats1.getSamplesRejectedForException() + stats2.getSamplesRejectedForException())
);
assertThat(
stats1CombineStats2.getSamplesRejectedForSize(),
equalTo(stats1.getSamplesRejectedForSize() + stats2.getSamplesRejectedForSize())
);
assertThat(
stats1CombineStats2.getTimeSampling(),
equalTo(TimeValue.timeValueNanos(stats1.getTimeSampling().nanos() + stats2.getTimeSampling().nanos()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -96,6 +97,7 @@ public void testMaybeSample() {
assertThat(stats.getSamplesRejectedForRate(), equalTo(0L));
assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L));
assertThat(stats.getSamplesRejectedForException(), equalTo(0L));
assertThat(stats.getSamplesRejectedForSize(), equalTo(0L));
assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo(0L));
assertThat(stats.getLastException(), nullValue());
assertThat(stats.getTimeSampling(), greaterThan(TimeValue.ZERO));
Expand Down Expand Up @@ -224,6 +226,36 @@ public void testMaybeSampleMaxSamples() {
assertThat(stats.getTimeEvaluatingCondition(), equalTo(TimeValue.ZERO));
}

public void testMaybeSampleMaxSize() {
/*
* This tests that the max size limit on the SamplingConfiguration is enforced. Here we set maxSize to 400. The source field of
* each index request is an array of 150 bytes. Since the size of the raw document is approximately the size of the source byte
* array, we expect to be able to insert 2 raw documents before all others are rejected due to the max size limit.
*/
assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG);
SamplingService samplingService = getTestSamplingService();
String indexName = randomIdentifier();
int maxSamples = randomIntBetween(2, 50);
ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(ProjectId.DEFAULT)
.putCustom(
SamplingMetadata.TYPE,
new SamplingMetadata(
Map.of(
indexName,
new SamplingConfiguration(1.0, maxSamples, ByteSizeValue.ofBytes(400), TimeValue.timeValueDays(3), null)
)
)
);
final ProjectId projectId = projectBuilder.getId();
ProjectMetadata projectMetadata = projectBuilder.build();
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(randomByteArrayOfLength(150), XContentType.JSON);
for (int i = 0; i < maxSamples; i++) {
samplingService.maybeSample(projectMetadata, indexRequest);
}
assertThat(samplingService.getLocalSample(projectId, indexName).size(), equalTo(2));
assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2));
}

private SamplingService getTestSamplingService() {
final ScriptService scriptService = new ScriptService(
Settings.EMPTY,
Expand Down