Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,19 @@ public final SamplingResult shouldSample(
boolean isSampled;
boolean isAdjustedCountCorrect;
if (isValidThreshold(threshold)) {
long randomness = getRandomness(otelTraceState, traceId);
isSampled = threshold <= randomness;
isAdjustedCountCorrect = intent.isAdjustedCountReliable();
} else { // DROP
// determine the randomness value to use
long randomness;
if (isAdjustedCountCorrect) {
randomness = getRandomness(otelTraceState, traceId);
} else {
// We cannot assume any particular distribution of the provided trace randomness,
// because the sampling decision may depend directly or indirectly on the randomness value;
// however, we still want to sample with probability corresponding to the obtained threshold
randomness = RandomValueGenerators.getDefault().generate(traceId);
}
isSampled = threshold <= randomness;
} else { // invalid threshold, DROP
isSampled = false;
isAdjustedCountCorrect = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
Expand Down Expand Up @@ -357,6 +361,200 @@ void testProportionalBehavior() {
.isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5));
}

@Test
void testUnstableDelegate() {
// Assume there are 10,000 spans/s and the delegate samples 50% of them with probability 100%,
// and unconditionally rejects the rest.
//
// Now, if we do not want to sample more than 1000 spans/s overall, the rate limiting
// sampler should calculate the effective threshold correctly.

double targetSpansPerSecondLimit = 1000;
double adaptationTimeSeconds = 5;

Composable delegate =
new CoinFlipSampler(ConsistentSampler.alwaysOff(), ConsistentSampler.alwaysOn());

ConsistentSampler sampler =
ConsistentSampler.rateLimited(
delegate, targetSpansPerSecondLimit, adaptationTimeSeconds, nanoTimeSupplier);

long averageRequestRatePerSecond = 10000;
int numSpans = 1000000;

List<Long> spanSampledNanos = new ArrayList<>();

for (int i = 0; i < numSpans; ++i) {
advanceTime(randomInterval(averageRequestRatePerSecond));
SamplingResult samplingResult =
sampler.shouldSample(
parentContext,
generateRandomTraceId(random),
name,
spanKind,
attributes,
parentLinks);
if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) {
spanSampledNanos.add(getCurrentTimeNanos());
}
}

long timeNow = nanoTime[0];
long numSampledSpansInLast5Seconds =
spanSampledNanos.stream().filter(x -> x > timeNow - 5000000000L && x <= timeNow).count();

assertThat(numSampledSpansInLast5Seconds / 5.)
.isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5));
}

@Test
void testLegacyCase() {
// This test makes sure that the issue
// https://github.com/open-telemetry/opentelemetry-java-contrib/issues/2007
// is resolved.

long averageRequestRatePerSecond = 10000;

// Assume the following setup:
// The root span is sampled by the legacy sampler AlwaysOn.
// One of its descendant spans, which we will call "parent" span, is sampled with
// stage1: ConsistentRateLimitingSampler(ConsistentParentBasedSampler, 5000/s).
// This will sample approximately 50% of the spans.

// Its "child" is similarly sampled by
// stage2: ConsistentRateLimitingSampler(ConsistentParentBasedSampler, 2500/s).

// This sampler will generate the same output as the root span described above:
// - the threshold will be 0, so all spans will be sampled
// - isAdjustedCountReliable will be false
// - there will be no threshold in TraceState, but the sampling flag will be set
Composable mockRootSampler = new LegacyLikeComposable(ConsistentSampler.alwaysOn());

double targetSpansPerSecondLimit = 2500; // for stage2
double adaptationTimeSeconds = 5;

// The sampler for "parent" spans
ConsistentSampler stage1 =
ConsistentSampler.rateLimited(
mockRootSampler,
2 * targetSpansPerSecondLimit,
adaptationTimeSeconds,
nanoTimeSupplier);

// The sampler for "child" spans (it will never see root spans)
ConsistentSampler stage2 =
ConsistentSampler.rateLimited(
ConsistentSampler.parentBased(ConsistentSampler.alwaysOff()),
targetSpansPerSecondLimit,
adaptationTimeSeconds,
nanoTimeSupplier);

int numSpans = 1000000;
int stage1SampledCount = 0;
int stage2SampledCount = 0;

for (int i = 0; i < numSpans; ++i) {
advanceTime(randomInterval(averageRequestRatePerSecond));
String traceId = generateRandomTraceId(random);

// Stage 1 sampling, the "parent"
SamplingResult samplingResult1 =
stage1.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);

boolean isSampled = SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult1.getDecision());
if (isSampled) {
stage1SampledCount++;
}

// Prepare the context for the child span, pass parent's TraceState to the child
Span parentSpan = Span.fromContext(parentContext);
SpanContext parentSpanContext = parentSpan.getSpanContext();
TraceState parentSamplingTraceState =
samplingResult1.getUpdatedTraceState(parentSpanContext.getTraceState());

SpanContext childSpanContext =
SpanContext.create(
traceId,
"1000badbadbad000",
isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(),
parentSamplingTraceState);
Span childSpan = Span.wrap(childSpanContext);
Context childContext = childSpan.storeInContext(parentContext);

// Stage 2 sampling, the "child"
SamplingResult samplingResult2 =
stage2.shouldSample(childContext, traceId, name, spanKind, attributes, parentLinks);

if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult2.getDecision())) {
stage2SampledCount++;
}
}

long timeNow = nanoTime[0];
double duration = timeNow / 1000000000.0; // in seconds
assertThat(duration)
.isCloseTo(numSpans / (double) averageRequestRatePerSecond, Percentage.withPercentage(2));

assertThat(stage1SampledCount / duration)
.isCloseTo(2 * targetSpansPerSecondLimit, Percentage.withPercentage(2));

assertThat(stage2SampledCount / duration)
.isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(2));
}

/*
* An auxiliary class used to simulate the behavior of a legacy (non consistent-probability)
* sampler, just for testing mixed environment
*/
static class LegacyLikeComposable implements Composable {

private final Composable delegate;

public LegacyLikeComposable(Composable delegate) {
this.delegate = delegate;
}

@Override
public SamplingIntent getSamplingIntent(
Context parentContext,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {

SamplingIntent delegateIntent =
delegate.getSamplingIntent(parentContext, name, spanKind, attributes, parentLinks);

return new SamplingIntent() {
@Override
public long getThreshold() {
return delegateIntent.getThreshold();
}

@Override
public boolean isAdjustedCountReliable() {
// Forcing "legacy" behavior, no threshold will be put into TraceState
return false;
}

@Override
public Attributes getAttributes() {
return delegateIntent.getAttributes();
}

@Override
public TraceState updateTraceState(TraceState previousState) {
return delegateIntent.updateTraceState(previousState);
}
};
}

@Override
public String getDescription() {
return "LegacyLike(" + delegate.getDescription() + ")";
}
}

@Test
void testDescription() {

Expand Down
Loading