Skip to content

Commit 49a5351

Browse files
committed
Add experimental consistent sampler
1 parent cf31a69 commit 49a5351

File tree

16 files changed

+826
-0
lines changed

16 files changed

+826
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
__all__ = [
2+
"ComposableSampler",
3+
"ConsistentSampler",
4+
"SamplingIntent",
5+
"consistent_always_off",
6+
"consistent_always_on",
7+
"consistent_parent_based",
8+
"consistent_probability_based",
9+
]
10+
11+
12+
from ._always_off import consistent_always_off
13+
from ._always_on import consistent_always_on
14+
from ._composable import ComposableSampler, SamplingIntent
15+
from ._fixed_threshold import consistent_probability_based
16+
from ._parent_based import consistent_parent_based
17+
from ._sampler import ConsistentSampler
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing import Optional, Sequence
2+
3+
from opentelemetry.context import Context
4+
from opentelemetry.trace import Link, SpanKind, TraceState
5+
from opentelemetry.util.types import Attributes
6+
7+
from ._composable import ComposableSampler, SamplingIntent
8+
from ._sampler import ConsistentSampler
9+
from ._util import INVALID_THRESHOLD
10+
11+
_intent = SamplingIntent(
12+
threshold=INVALID_THRESHOLD, adjusted_count_reliable=False
13+
)
14+
15+
16+
class ConsistentAlwaysOffSampler(ComposableSampler):
17+
def sampling_intent(
18+
self,
19+
parent_ctx: Optional[Context],
20+
name: str,
21+
span_kind: Optional[SpanKind],
22+
attributes: Attributes,
23+
links: Optional[Sequence[Link]],
24+
trace_state: Optional[TraceState] = None,
25+
) -> SamplingIntent:
26+
return _intent
27+
28+
def get_description(self) -> str:
29+
return "ConsistentAlwaysOffSampler"
30+
31+
32+
_always_off = ConsistentSampler(ConsistentAlwaysOffSampler())
33+
34+
35+
def consistent_always_off() -> ConsistentSampler:
36+
"""Returns a consistent sampler that does not sample any span."""
37+
return _always_off
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from typing import Optional, Sequence
2+
3+
from opentelemetry.context import Context
4+
from opentelemetry.trace import Link, SpanKind, TraceState
5+
from opentelemetry.util.types import Attributes
6+
7+
from ._composable import ComposableSampler, SamplingIntent
8+
from ._sampler import ConsistentSampler
9+
from ._util import MIN_THRESHOLD
10+
11+
_intent = SamplingIntent(threshold=MIN_THRESHOLD)
12+
13+
14+
class ConsistentAlwaysOnSampler(ComposableSampler):
15+
def sampling_intent(
16+
self,
17+
parent_ctx: Optional[Context],
18+
name: str,
19+
span_kind: Optional[SpanKind],
20+
attributes: Attributes,
21+
links: Optional[Sequence[Link]],
22+
trace_state: Optional[TraceState] = None,
23+
) -> SamplingIntent:
24+
return _intent
25+
26+
def get_description(self) -> str:
27+
return "ConsistentAlwaysOnSampler"
28+
29+
30+
_always_on = ConsistentSampler(ConsistentAlwaysOnSampler())
31+
32+
33+
def consistent_always_on() -> ConsistentSampler:
34+
"""Returns a consistent sampler that samples all spans."""
35+
return _always_on
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from dataclasses import dataclass, field
2+
from typing import Callable, Optional, Protocol, Sequence
3+
4+
from opentelemetry.context import Context
5+
from opentelemetry.trace import Link, SpanKind, TraceState
6+
from opentelemetry.util.types import Attributes
7+
8+
9+
@dataclass(frozen=True)
10+
class SamplingIntent:
11+
"""Information to make a consistent sampling decision."""
12+
13+
threshold: int
14+
adjusted_count_reliable: bool = field(default=True)
15+
attributes: Attributes = field(default=None)
16+
update_trace_state: Callable[[TraceState], TraceState] = field(
17+
default=lambda ts: ts
18+
)
19+
20+
21+
class ComposableSampler(Protocol):
22+
"""A sampler that can be composed to make a final consistent sampling decision."""
23+
24+
def sampling_intent(
25+
self,
26+
parent_ctx: Optional[Context],
27+
name: str,
28+
span_kind: Optional[SpanKind],
29+
attributes: Attributes,
30+
links: Optional[Sequence[Link]],
31+
trace_state: Optional[TraceState],
32+
) -> SamplingIntent:
33+
"""Returns information to make a consistent sampling decision."""
34+
...
35+
36+
def get_description(self) -> str:
37+
"""Returns a description of the sampler."""
38+
...
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from typing import Optional, Sequence
2+
3+
from opentelemetry.context import Context
4+
from opentelemetry.trace import Link, SpanKind, TraceState
5+
from opentelemetry.util.types import Attributes
6+
7+
from ._composable import ComposableSampler, SamplingIntent
8+
from ._sampler import ConsistentSampler
9+
from ._trace_state import serialize_th
10+
from ._util import INVALID_THRESHOLD, MAX_THRESHOLD, calculate_threshold
11+
12+
13+
class ConsistentFixedThresholdSampler(ComposableSampler):
14+
_threshold: int
15+
_description: str
16+
17+
def __init__(self, sampling_probability: float):
18+
threshold = calculate_threshold(sampling_probability)
19+
if threshold == MAX_THRESHOLD:
20+
threshold_str = "max"
21+
else:
22+
threshold_str = serialize_th(threshold)
23+
threshold = (
24+
INVALID_THRESHOLD if threshold == MAX_THRESHOLD else threshold
25+
)
26+
self._intent = SamplingIntent(threshold=threshold)
27+
self._description = f"ConsistentFixedThresholdSampler{{threshold={threshold_str}, sampling probability={sampling_probability}}}"
28+
29+
def sampling_intent(
30+
self,
31+
parent_ctx: Optional[Context],
32+
name: str,
33+
span_kind: Optional[SpanKind],
34+
attributes: Attributes,
35+
links: Optional[Sequence[Link]],
36+
trace_state: Optional[TraceState] = None,
37+
) -> SamplingIntent:
38+
return self._intent
39+
40+
def get_description(self) -> str:
41+
return self._description
42+
43+
44+
def consistent_probability_based(
45+
sampling_probability: float,
46+
) -> ConsistentSampler:
47+
"""Returns a consistent sampler that samples each span with a fixed probability."""
48+
if not (0.0 <= sampling_probability <= 1.0):
49+
raise ValueError("Sampling probability must be between 0.0 and 1.0")
50+
51+
return ConsistentSampler(
52+
ConsistentFixedThresholdSampler(sampling_probability)
53+
)
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from typing import Optional, Sequence
2+
3+
from opentelemetry.context import Context
4+
from opentelemetry.trace import Link, SpanKind, TraceState, get_current_span
5+
from opentelemetry.util.types import Attributes
6+
7+
from ._composable import ComposableSampler, SamplingIntent
8+
from ._sampler import ConsistentSampler
9+
from ._trace_state import OtelTraceState
10+
from ._util import (
11+
INVALID_THRESHOLD,
12+
MIN_THRESHOLD,
13+
is_valid_threshold,
14+
)
15+
16+
17+
class ConsistentParentBasedSampler(ComposableSampler):
18+
def __init__(self, root_sampler: ComposableSampler):
19+
self._root_sampler = root_sampler
20+
self._description = f"ConsistentParentBasedSampler{{root_sampler={root_sampler.get_description()}}}"
21+
22+
def sampling_intent(
23+
self,
24+
parent_ctx: Optional[Context],
25+
name: str,
26+
span_kind: Optional[SpanKind],
27+
attributes: Attributes,
28+
links: Optional[Sequence[Link]],
29+
trace_state: Optional[TraceState] = None,
30+
) -> SamplingIntent:
31+
parent_span = get_current_span(parent_ctx)
32+
parent_span_ctx = parent_span.get_span_context()
33+
is_root = not parent_span_ctx.is_valid
34+
if is_root:
35+
return self._root_sampler.sampling_intent(
36+
parent_ctx, name, span_kind, attributes, links, trace_state
37+
)
38+
39+
ot_trace_state = OtelTraceState.parse(trace_state)
40+
41+
if is_valid_threshold(ot_trace_state.threshold):
42+
return SamplingIntent(
43+
threshold=ot_trace_state.threshold,
44+
adjusted_count_reliable=True,
45+
)
46+
else:
47+
threshold = (
48+
MIN_THRESHOLD
49+
if parent_span_ctx.trace_flags.sampled
50+
else INVALID_THRESHOLD
51+
)
52+
return SamplingIntent(
53+
threshold=threshold, adjusted_count_reliable=False
54+
)
55+
56+
def get_description(self) -> str:
57+
return self._description
58+
59+
60+
def consistent_parent_based(
61+
root_sampler: ComposableSampler,
62+
) -> ConsistentSampler:
63+
"""Returns a consistent sampler that respects the sampling decision of
64+
the parent span or falls-back to the given sampler if it is a root span."""
65+
return ConsistentSampler(ConsistentParentBasedSampler(root_sampler))
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from typing import Optional, Sequence
2+
3+
from opentelemetry.context import Context
4+
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
5+
from opentelemetry.trace import Link, SpanKind, TraceState
6+
from opentelemetry.util.types import Attributes
7+
8+
from ._composable import ComposableSampler, SamplingIntent
9+
from ._trace_state import OTEL_TRACE_STATE_KEY, OtelTraceState
10+
from ._util import INVALID_THRESHOLD, is_valid_random_value, is_valid_threshold
11+
12+
13+
class ConsistentSampler(Sampler, ComposableSampler):
14+
"""A sampler that uses a consistent sampling strategy based on a delegate sampler."""
15+
16+
def __init__(self, delegate: ComposableSampler):
17+
self._delegate = delegate
18+
19+
def should_sample(
20+
self,
21+
parent_context: Optional[Context],
22+
trace_id: int,
23+
name: str,
24+
kind: Optional[SpanKind] = None,
25+
attributes: Attributes = None,
26+
links: Optional[Sequence[Link]] = None,
27+
trace_state: Optional[TraceState] = None,
28+
) -> SamplingResult:
29+
ot_trace_state = OtelTraceState.parse(trace_state)
30+
31+
intent = self._delegate.sampling_intent(
32+
parent_context, name, kind, attributes, links, trace_state
33+
)
34+
threshold = intent.threshold
35+
36+
if is_valid_threshold(threshold):
37+
adjusted_count_correct = intent.adjusted_count_reliable
38+
if is_valid_random_value(ot_trace_state.random_value):
39+
randomness = ot_trace_state.random_value
40+
else:
41+
# Use last 56 bits of trace_id as randomness
42+
randomness = trace_id & 0x00FFFFFFFFFFFFFF
43+
sampled = threshold <= randomness
44+
else:
45+
sampled = False
46+
adjusted_count_correct = False
47+
48+
decision = Decision.RECORD_AND_SAMPLE if sampled else Decision.DROP
49+
if sampled and adjusted_count_correct:
50+
ot_trace_state.threshold = threshold
51+
else:
52+
ot_trace_state.threshold = INVALID_THRESHOLD
53+
54+
otts = ot_trace_state.serialize()
55+
if not trace_state:
56+
if otts:
57+
new_trace_state = TraceState(((OTEL_TRACE_STATE_KEY, otts),))
58+
else:
59+
new_trace_state = None
60+
else:
61+
new_trace_state = intent.update_trace_state(trace_state)
62+
if otts:
63+
new_trace_state = new_trace_state.update(
64+
OTEL_TRACE_STATE_KEY, otts
65+
)
66+
67+
return SamplingResult(decision, intent.attributes, new_trace_state)
68+
69+
def sampling_intent(
70+
self,
71+
parent_ctx: Optional[Context],
72+
name: str,
73+
span_kind: Optional[SpanKind],
74+
attributes: Attributes,
75+
links: Optional[Sequence[Link]],
76+
trace_state: Optional[TraceState],
77+
) -> SamplingIntent:
78+
return self._delegate.sampling_intent(
79+
parent_ctx, name, span_kind, attributes, links, trace_state
80+
)
81+
82+
def get_description(self) -> str:
83+
return self._delegate.get_description()

0 commit comments

Comments
 (0)