diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentSampler.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentSampler.java index 1b2cedf08..8ac616963 100644 --- a/consistent-sampling/src/main/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentSampler.java +++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentSampler.java @@ -186,10 +186,19 @@ public final SamplingResult shouldSample( boolean isSampled; boolean isAdjustedCountCorrect; if (isValidThreshold(threshold)) { - long randomness = getRandomness(otelTraceState, traceId); - isSampled = threshold <= randomness; isAdjustedCountCorrect = intent.isAdjustedCountReliable(); - } else { // DROP + // determine the randomness value to use + long randomness; + if (isAdjustedCountCorrect) { + randomness = getRandomness(otelTraceState, traceId); + } else { + // We cannot assume any particular distribution of the provided trace randomness, + // because the sampling decision may depend directly or indirectly on the randomness value; + // however, we still want to sample with probability corresponding to the obtained threshold + randomness = RandomValueGenerators.getDefault().generate(traceId); + } + isSampled = threshold <= randomness; + } else { // invalid threshold, DROP isSampled = false; isAdjustedCountCorrect = false; } diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentRateLimitingSamplerTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentRateLimitingSamplerTest.java index cc56df1ef..d5cb6b640 100644 --- a/consistent-sampling/src/test/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentRateLimitingSamplerTest.java +++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentRateLimitingSamplerTest.java @@ -10,7 +10,11 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; @@ -357,6 +361,200 @@ void testProportionalBehavior() { .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); } + @Test + void testUnstableDelegate() { + // Assume there are 10,000 spans/s and the delegate samples 50% of them with probability 100%, + // and unconditionally rejects the rest. + // + // Now, if we do not want to sample more than 1000 spans/s overall, the rate limiting + // sampler should calculate the effective threshold correctly. + + double targetSpansPerSecondLimit = 1000; + double adaptationTimeSeconds = 5; + + Composable delegate = + new CoinFlipSampler(ConsistentSampler.alwaysOff(), ConsistentSampler.alwaysOn()); + + ConsistentSampler sampler = + ConsistentSampler.rateLimited( + delegate, targetSpansPerSecondLimit, adaptationTimeSeconds, nanoTimeSupplier); + + long averageRequestRatePerSecond = 10000; + int numSpans = 1000000; + + List spanSampledNanos = new ArrayList<>(); + + for (int i = 0; i < numSpans; ++i) { + advanceTime(randomInterval(averageRequestRatePerSecond)); + SamplingResult samplingResult = + sampler.shouldSample( + parentContext, + generateRandomTraceId(random), + name, + spanKind, + attributes, + parentLinks); + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) { + spanSampledNanos.add(getCurrentTimeNanos()); + } + } + + long timeNow = nanoTime[0]; + long numSampledSpansInLast5Seconds = + spanSampledNanos.stream().filter(x -> x > timeNow - 5000000000L && x <= timeNow).count(); + + assertThat(numSampledSpansInLast5Seconds / 5.) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5)); + } + + @Test + void testLegacyCase() { + // This test makes sure that the issue + // https://github.com/open-telemetry/opentelemetry-java-contrib/issues/2007 + // is resolved. + + long averageRequestRatePerSecond = 10000; + + // Assume the following setup: + // The root span is sampled by the legacy sampler AlwaysOn. + // One of its descendant spans, which we will call "parent" span, is sampled with + // stage1: ConsistentRateLimitingSampler(ConsistentParentBasedSampler, 5000/s). + // This will sample approximately 50% of the spans. + + // Its "child" is similarly sampled by + // stage2: ConsistentRateLimitingSampler(ConsistentParentBasedSampler, 2500/s). + + // This sampler will generate the same output as the root span described above: + // - the threshold will be 0, so all spans will be sampled + // - isAdjustedCountReliable will be false + // - there will be no threshold in TraceState, but the sampling flag will be set + Composable mockRootSampler = new LegacyLikeComposable(ConsistentSampler.alwaysOn()); + + double targetSpansPerSecondLimit = 2500; // for stage2 + double adaptationTimeSeconds = 5; + + // The sampler for "parent" spans + ConsistentSampler stage1 = + ConsistentSampler.rateLimited( + mockRootSampler, + 2 * targetSpansPerSecondLimit, + adaptationTimeSeconds, + nanoTimeSupplier); + + // The sampler for "child" spans (it will never see root spans) + ConsistentSampler stage2 = + ConsistentSampler.rateLimited( + ConsistentSampler.parentBased(ConsistentSampler.alwaysOff()), + targetSpansPerSecondLimit, + adaptationTimeSeconds, + nanoTimeSupplier); + + int numSpans = 1000000; + int stage1SampledCount = 0; + int stage2SampledCount = 0; + + for (int i = 0; i < numSpans; ++i) { + advanceTime(randomInterval(averageRequestRatePerSecond)); + String traceId = generateRandomTraceId(random); + + // Stage 1 sampling, the "parent" + SamplingResult samplingResult1 = + stage1.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + + boolean isSampled = SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult1.getDecision()); + if (isSampled) { + stage1SampledCount++; + } + + // Prepare the context for the child span, pass parent's TraceState to the child + Span parentSpan = Span.fromContext(parentContext); + SpanContext parentSpanContext = parentSpan.getSpanContext(); + TraceState parentSamplingTraceState = + samplingResult1.getUpdatedTraceState(parentSpanContext.getTraceState()); + + SpanContext childSpanContext = + SpanContext.create( + traceId, + "1000badbadbad000", + isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(), + parentSamplingTraceState); + Span childSpan = Span.wrap(childSpanContext); + Context childContext = childSpan.storeInContext(parentContext); + + // Stage 2 sampling, the "child" + SamplingResult samplingResult2 = + stage2.shouldSample(childContext, traceId, name, spanKind, attributes, parentLinks); + + if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult2.getDecision())) { + stage2SampledCount++; + } + } + + long timeNow = nanoTime[0]; + double duration = timeNow / 1000000000.0; // in seconds + assertThat(duration) + .isCloseTo(numSpans / (double) averageRequestRatePerSecond, Percentage.withPercentage(2)); + + assertThat(stage1SampledCount / duration) + .isCloseTo(2 * targetSpansPerSecondLimit, Percentage.withPercentage(2)); + + assertThat(stage2SampledCount / duration) + .isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(2)); + } + + /* + * An auxiliary class used to simulate the behavior of a legacy (non consistent-probability) + * sampler, just for testing mixed environment + */ + static class LegacyLikeComposable implements Composable { + + private final Composable delegate; + + public LegacyLikeComposable(Composable delegate) { + this.delegate = delegate; + } + + @Override + public SamplingIntent getSamplingIntent( + Context parentContext, + String name, + SpanKind spanKind, + Attributes attributes, + List parentLinks) { + + SamplingIntent delegateIntent = + delegate.getSamplingIntent(parentContext, name, spanKind, attributes, parentLinks); + + return new SamplingIntent() { + @Override + public long getThreshold() { + return delegateIntent.getThreshold(); + } + + @Override + public boolean isAdjustedCountReliable() { + // Forcing "legacy" behavior, no threshold will be put into TraceState + return false; + } + + @Override + public Attributes getAttributes() { + return delegateIntent.getAttributes(); + } + + @Override + public TraceState updateTraceState(TraceState previousState) { + return delegateIntent.updateTraceState(previousState); + } + }; + } + + @Override + public String getDescription() { + return "LegacyLike(" + delegate.getDescription() + ")"; + } + } + @Test void testDescription() {