Skip to content

Commit 754211b

Browse files
2007_ConsistentRateLimitingSampler (#2022)
Co-authored-by: jason plumb <[email protected]>
1 parent e81ef92 commit 754211b

File tree

2 files changed

+210
-3
lines changed

2 files changed

+210
-3
lines changed

consistent-sampling/src/main/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentSampler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,19 @@ public final SamplingResult shouldSample(
186186
boolean isSampled;
187187
boolean isAdjustedCountCorrect;
188188
if (isValidThreshold(threshold)) {
189-
long randomness = getRandomness(otelTraceState, traceId);
190-
isSampled = threshold <= randomness;
191189
isAdjustedCountCorrect = intent.isAdjustedCountReliable();
192-
} else { // DROP
190+
// determine the randomness value to use
191+
long randomness;
192+
if (isAdjustedCountCorrect) {
193+
randomness = getRandomness(otelTraceState, traceId);
194+
} else {
195+
// We cannot assume any particular distribution of the provided trace randomness,
196+
// because the sampling decision may depend directly or indirectly on the randomness value;
197+
// however, we still want to sample with probability corresponding to the obtained threshold
198+
randomness = RandomValueGenerators.getDefault().generate(traceId);
199+
}
200+
isSampled = threshold <= randomness;
201+
} else { // invalid threshold, DROP
193202
isSampled = false;
194203
isAdjustedCountCorrect = false;
195204
}

consistent-sampling/src/test/java/io/opentelemetry/contrib/sampler/consistent56/ConsistentRateLimitingSamplerTest.java

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010

1111
import io.opentelemetry.api.common.AttributeKey;
1212
import io.opentelemetry.api.common.Attributes;
13+
import io.opentelemetry.api.trace.Span;
14+
import io.opentelemetry.api.trace.SpanContext;
1315
import io.opentelemetry.api.trace.SpanKind;
16+
import io.opentelemetry.api.trace.TraceFlags;
17+
import io.opentelemetry.api.trace.TraceState;
1418
import io.opentelemetry.context.Context;
1519
import io.opentelemetry.sdk.trace.data.LinkData;
1620
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
@@ -357,6 +361,200 @@ void testProportionalBehavior() {
357361
.isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5));
358362
}
359363

364+
@Test
365+
void testUnstableDelegate() {
366+
// Assume there are 10,000 spans/s and the delegate samples 50% of them with probability 100%,
367+
// and unconditionally rejects the rest.
368+
//
369+
// Now, if we do not want to sample more than 1000 spans/s overall, the rate limiting
370+
// sampler should calculate the effective threshold correctly.
371+
372+
double targetSpansPerSecondLimit = 1000;
373+
double adaptationTimeSeconds = 5;
374+
375+
Composable delegate =
376+
new CoinFlipSampler(ConsistentSampler.alwaysOff(), ConsistentSampler.alwaysOn());
377+
378+
ConsistentSampler sampler =
379+
ConsistentSampler.rateLimited(
380+
delegate, targetSpansPerSecondLimit, adaptationTimeSeconds, nanoTimeSupplier);
381+
382+
long averageRequestRatePerSecond = 10000;
383+
int numSpans = 1000000;
384+
385+
List<Long> spanSampledNanos = new ArrayList<>();
386+
387+
for (int i = 0; i < numSpans; ++i) {
388+
advanceTime(randomInterval(averageRequestRatePerSecond));
389+
SamplingResult samplingResult =
390+
sampler.shouldSample(
391+
parentContext,
392+
generateRandomTraceId(random),
393+
name,
394+
spanKind,
395+
attributes,
396+
parentLinks);
397+
if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult.getDecision())) {
398+
spanSampledNanos.add(getCurrentTimeNanos());
399+
}
400+
}
401+
402+
long timeNow = nanoTime[0];
403+
long numSampledSpansInLast5Seconds =
404+
spanSampledNanos.stream().filter(x -> x > timeNow - 5000000000L && x <= timeNow).count();
405+
406+
assertThat(numSampledSpansInLast5Seconds / 5.)
407+
.isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(5));
408+
}
409+
410+
@Test
411+
void testLegacyCase() {
412+
// This test makes sure that the issue
413+
// https://github.com/open-telemetry/opentelemetry-java-contrib/issues/2007
414+
// is resolved.
415+
416+
long averageRequestRatePerSecond = 10000;
417+
418+
// Assume the following setup:
419+
// The root span is sampled by the legacy sampler AlwaysOn.
420+
// One of its descendant spans, which we will call "parent" span, is sampled with
421+
// stage1: ConsistentRateLimitingSampler(ConsistentParentBasedSampler, 5000/s).
422+
// This will sample approximately 50% of the spans.
423+
424+
// Its "child" is similarly sampled by
425+
// stage2: ConsistentRateLimitingSampler(ConsistentParentBasedSampler, 2500/s).
426+
427+
// This sampler will generate the same output as the root span described above:
428+
// - the threshold will be 0, so all spans will be sampled
429+
// - isAdjustedCountReliable will be false
430+
// - there will be no threshold in TraceState, but the sampling flag will be set
431+
Composable mockRootSampler = new LegacyLikeComposable(ConsistentSampler.alwaysOn());
432+
433+
double targetSpansPerSecondLimit = 2500; // for stage2
434+
double adaptationTimeSeconds = 5;
435+
436+
// The sampler for "parent" spans
437+
ConsistentSampler stage1 =
438+
ConsistentSampler.rateLimited(
439+
mockRootSampler,
440+
2 * targetSpansPerSecondLimit,
441+
adaptationTimeSeconds,
442+
nanoTimeSupplier);
443+
444+
// The sampler for "child" spans (it will never see root spans)
445+
ConsistentSampler stage2 =
446+
ConsistentSampler.rateLimited(
447+
ConsistentSampler.parentBased(ConsistentSampler.alwaysOff()),
448+
targetSpansPerSecondLimit,
449+
adaptationTimeSeconds,
450+
nanoTimeSupplier);
451+
452+
int numSpans = 1000000;
453+
int stage1SampledCount = 0;
454+
int stage2SampledCount = 0;
455+
456+
for (int i = 0; i < numSpans; ++i) {
457+
advanceTime(randomInterval(averageRequestRatePerSecond));
458+
String traceId = generateRandomTraceId(random);
459+
460+
// Stage 1 sampling, the "parent"
461+
SamplingResult samplingResult1 =
462+
stage1.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
463+
464+
boolean isSampled = SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult1.getDecision());
465+
if (isSampled) {
466+
stage1SampledCount++;
467+
}
468+
469+
// Prepare the context for the child span, pass parent's TraceState to the child
470+
Span parentSpan = Span.fromContext(parentContext);
471+
SpanContext parentSpanContext = parentSpan.getSpanContext();
472+
TraceState parentSamplingTraceState =
473+
samplingResult1.getUpdatedTraceState(parentSpanContext.getTraceState());
474+
475+
SpanContext childSpanContext =
476+
SpanContext.create(
477+
traceId,
478+
"1000badbadbad000",
479+
isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(),
480+
parentSamplingTraceState);
481+
Span childSpan = Span.wrap(childSpanContext);
482+
Context childContext = childSpan.storeInContext(parentContext);
483+
484+
// Stage 2 sampling, the "child"
485+
SamplingResult samplingResult2 =
486+
stage2.shouldSample(childContext, traceId, name, spanKind, attributes, parentLinks);
487+
488+
if (SamplingDecision.RECORD_AND_SAMPLE.equals(samplingResult2.getDecision())) {
489+
stage2SampledCount++;
490+
}
491+
}
492+
493+
long timeNow = nanoTime[0];
494+
double duration = timeNow / 1000000000.0; // in seconds
495+
assertThat(duration)
496+
.isCloseTo(numSpans / (double) averageRequestRatePerSecond, Percentage.withPercentage(2));
497+
498+
assertThat(stage1SampledCount / duration)
499+
.isCloseTo(2 * targetSpansPerSecondLimit, Percentage.withPercentage(2));
500+
501+
assertThat(stage2SampledCount / duration)
502+
.isCloseTo(targetSpansPerSecondLimit, Percentage.withPercentage(2));
503+
}
504+
505+
/*
506+
* An auxiliary class used to simulate the behavior of a legacy (non consistent-probability)
507+
* sampler, just for testing mixed environment
508+
*/
509+
static class LegacyLikeComposable implements Composable {
510+
511+
private final Composable delegate;
512+
513+
public LegacyLikeComposable(Composable delegate) {
514+
this.delegate = delegate;
515+
}
516+
517+
@Override
518+
public SamplingIntent getSamplingIntent(
519+
Context parentContext,
520+
String name,
521+
SpanKind spanKind,
522+
Attributes attributes,
523+
List<LinkData> parentLinks) {
524+
525+
SamplingIntent delegateIntent =
526+
delegate.getSamplingIntent(parentContext, name, spanKind, attributes, parentLinks);
527+
528+
return new SamplingIntent() {
529+
@Override
530+
public long getThreshold() {
531+
return delegateIntent.getThreshold();
532+
}
533+
534+
@Override
535+
public boolean isAdjustedCountReliable() {
536+
// Forcing "legacy" behavior, no threshold will be put into TraceState
537+
return false;
538+
}
539+
540+
@Override
541+
public Attributes getAttributes() {
542+
return delegateIntent.getAttributes();
543+
}
544+
545+
@Override
546+
public TraceState updateTraceState(TraceState previousState) {
547+
return delegateIntent.updateTraceState(previousState);
548+
}
549+
};
550+
}
551+
552+
@Override
553+
public String getDescription() {
554+
return "LegacyLike(" + delegate.getDescription() + ")";
555+
}
556+
}
557+
360558
@Test
361559
void testDescription() {
362560

0 commit comments

Comments
 (0)