Skip to content

Commit e28bbdd

Browse files
anuraagaxrmx
andauthored
Wire up composite sampler (#410)
* Wire up composite sampler * Lint * Make edot sampler default but not forced * Cleanup * Apply suggestions from code review --------- Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 32fbeb9 commit e28bbdd

File tree

5 files changed

+127
-25
lines changed

5 files changed

+127
-25
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ telemetry_distro = "elasticotel.sdk.resources:TelemetryDistroResourceDetector"
5555
service_instance = "elasticotel.sdk.resources:ServiceInstanceResourceDetector"
5656
_gcp = "opentelemetry.resourcedetector.gcp_resource_detector._detector:GoogleCloudResourceDetector"
5757

58+
[project.entry-points.opentelemetry_traces_sampler]
59+
experimental_composite_parentbased_traceidratio = "elasticotel.sdk.sampler:DefaultSampler"
60+
5861
[project.scripts]
5962
edot-bootstrap = "elasticotel.instrumentation.bootstrap:run"
6063

src/elasticotel/distro/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
ELASTIC_OTEL_SYSTEM_METRICS_ENABLED,
6363
)
6464
from elasticotel.distro.resource_detectors import get_cloud_resource_detectors
65-
from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE, _initialize_config
65+
from elasticotel.distro.config import opamp_handler, _initialize_config, DEFAULT_SAMPLING_RATE
6666

6767

6868
logger = logging.getLogger(__name__)
@@ -93,6 +93,7 @@ def _configure(self, **kwargs):
9393
HTTPOTLPMetricExporter: otlp_http_exporter_options,
9494
HTTPOTLPSpanExporter: otlp_http_exporter_options,
9595
}
96+
9697
super()._configure(**kwargs)
9798

9899
# set our local config based on environment variables
@@ -171,7 +172,7 @@ def _configure(self, **kwargs):
171172
os.environ.setdefault(OTEL_METRICS_EXEMPLAR_FILTER, "always_off")
172173
# preference to use DELTA temporality as we can handle only this kind of Histograms
173174
os.environ.setdefault(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "DELTA")
174-
os.environ.setdefault(OTEL_TRACES_SAMPLER, "parentbased_traceidratio")
175+
os.environ.setdefault(OTEL_TRACES_SAMPLER, "experimental_composite_parentbased_traceidratio")
175176
os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, str(DEFAULT_SAMPLING_RATE))
176177

177178
base_resource_detectors = [

src/elasticotel/distro/config.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from dataclasses import dataclass
2222

2323
from elasticotel.distro.sanitization import _sanitize_headers_env_vars
24+
from elasticotel.sdk.sampler import DefaultSampler
2425
from opentelemetry import trace
2526
from opentelemetry._opamp import messages
2627
from opentelemetry._opamp.agent import OpAMPAgent
@@ -31,7 +32,6 @@
3132
)
3233
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
3334
from opentelemetry.sdk.environment_variables import OTEL_LOG_LEVEL, OTEL_TRACES_SAMPLER_ARG
34-
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
3535

3636

3737
logger = logging.getLogger(__name__)
@@ -160,20 +160,14 @@ def _handle_sampling_rate(remote_config) -> ConfigUpdate:
160160
logger.debug("Cannot get sampler from tracer provider.")
161161
return ConfigUpdate()
162162

163-
# FIXME: this needs to be updated for the consistent probability samplers
164-
if not isinstance(sampler, ParentBasedTraceIdRatio):
163+
if not isinstance(sampler, DefaultSampler):
165164
logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler))
166165
return ConfigUpdate()
167166

168-
# since sampler is parent based we need to update its root sampler
169-
root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue]
170-
if root_sampler.rate != sampling_rate: # type: ignore[reportAttributeAccessIssue]
171-
# we don't have a proper way to update it :)
172-
root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue]
173-
root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue]
174-
logger.debug("Updated sampler rate to %s", sampling_rate)
175-
if _config:
176-
_config.sampling_rate.update(value=config_sampling_rate)
167+
sampler.set_ratio(sampling_rate)
168+
logger.debug("Updated sampler rate to %s", sampling_rate)
169+
if _config:
170+
_config.sampling_rate.update(value=config_sampling_rate)
177171
return ConfigUpdate()
178172

179173

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
# or more contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
from __future__ import annotations
18+
19+
import logging
20+
from typing import Sequence
21+
22+
from opentelemetry.context import Context
23+
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult
24+
from opentelemetry.trace import Link, SpanKind, TraceState
25+
from opentelemetry.sdk.trace._sampling_experimental import (
26+
composite_sampler,
27+
composable_parent_threshold,
28+
composable_traceid_ratio_based,
29+
)
30+
from opentelemetry.util.types import Attributes
31+
32+
logger = logging.getLogger(__name__)
33+
34+
35+
class DefaultSampler(Sampler):
36+
"""The default sampler for EDOT, which is a parent-based ratio sampler with the rate
37+
updatable from central config."""
38+
39+
def __init__(self, ratio_str: str):
40+
try:
41+
ratio = float(ratio_str)
42+
except ValueError:
43+
logger.warning("Invalid sampling rate '%s', defaulting to 1.0", ratio_str)
44+
ratio = 1.0
45+
self._delegate = _new_sampler(ratio)
46+
47+
def should_sample(
48+
self,
49+
parent_context: Context | None,
50+
trace_id: int,
51+
name: str,
52+
kind: SpanKind | None = None,
53+
attributes: Attributes | None = None,
54+
links: Sequence[Link] | None = None,
55+
trace_state: TraceState | None = None,
56+
) -> SamplingResult:
57+
return self._delegate.should_sample(
58+
parent_context,
59+
trace_id,
60+
name,
61+
kind,
62+
attributes,
63+
links,
64+
trace_state,
65+
)
66+
67+
def set_ratio(self, ratio: float):
68+
self._delegate = _new_sampler(ratio)
69+
70+
def get_description(self) -> str:
71+
return self._delegate.get_description()
72+
73+
74+
def _new_sampler(ratio: float):
75+
return composite_sampler(composable_parent_threshold(composable_traceid_ratio_based(ratio)))

tests/distro/test_distro.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from elasticotel.distro import ElasticOpenTelemetryConfigurator, ElasticOpenTelemetryDistro, logger as distro_logger
2323
from elasticotel.distro.config import opamp_handler, logger as config_logger, Config
2424
from elasticotel.distro.environment_variables import ELASTIC_OTEL_OPAMP_ENDPOINT, ELASTIC_OTEL_SYSTEM_METRICS_ENABLED
25+
from elasticotel.sdk.sampler import DefaultSampler
2526
from opentelemetry.environment_variables import (
2627
OTEL_LOGS_EXPORTER,
2728
OTEL_METRICS_EXPORTER,
@@ -35,11 +36,18 @@
3536
OTEL_TRACES_SAMPLER,
3637
OTEL_TRACES_SAMPLER_ARG,
3738
)
39+
from opentelemetry import trace
3840
from opentelemetry.sdk.trace import sampling
3941
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
42+
from opentelemetry.util._once import Once
4043

4144

4245
class TestDistribution(TestCase):
46+
def setUp(self):
47+
# Hackily reset global trace provider to allow tests to initialize it.
48+
trace._TRACER_PROVIDER = None
49+
trace._TRACER_PROVIDER_SET_ONCE = Once()
50+
4351
@mock.patch.dict("os.environ", {}, clear=True)
4452
def test_default_configuration(self):
4553
distro = ElasticOpenTelemetryDistro()
@@ -54,21 +62,39 @@ def test_default_configuration(self):
5462
)
5563
self.assertEqual("always_off", os.environ.get(OTEL_METRICS_EXEMPLAR_FILTER))
5664
self.assertEqual("DELTA", os.environ.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE))
57-
self.assertEqual("parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER))
65+
self.assertEqual("experimental_composite_parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER))
5866
self.assertEqual("1.0", os.environ.get(OTEL_TRACES_SAMPLER_ARG))
5967

6068
@mock.patch.dict("os.environ", {}, clear=True)
6169
def test_sampler_configuration(self):
62-
distro = ElasticOpenTelemetryDistro()
63-
distro._configure()
64-
parent_sampler = sampling._get_from_env_or_default()
65-
66-
assert isinstance(parent_sampler, sampling.ParentBasedTraceIdRatio)
70+
ElasticOpenTelemetryDistro()._configure()
71+
ElasticOpenTelemetryConfigurator()._configure()
72+
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
73+
self.assertTrue(isinstance(sampler, DefaultSampler))
74+
self.assertIn(
75+
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=0, ratio=1.0}}",
76+
sampler.get_description(),
77+
)
6778

68-
sampler = parent_sampler._root
79+
@mock.patch.dict("os.environ", {}, clear=True)
80+
def test_sampler_configuration_sampler_arg(self):
81+
os.environ[OTEL_TRACES_SAMPLER_ARG] = "0.0"
82+
ElasticOpenTelemetryDistro()._configure()
83+
ElasticOpenTelemetryConfigurator()._configure()
84+
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
85+
self.assertTrue(isinstance(sampler, DefaultSampler))
86+
self.assertIn(
87+
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=max, ratio=0.0}}",
88+
sampler.get_description(),
89+
)
6990

70-
assert isinstance(sampler, sampling.TraceIdRatioBased)
71-
assert sampler.rate == 1.0
91+
@mock.patch.dict("os.environ", {}, clear=True)
92+
def test_sampler_configuration_user_configured(self):
93+
os.environ[OTEL_TRACES_SAMPLER] = "always_on"
94+
ElasticOpenTelemetryDistro()._configure()
95+
ElasticOpenTelemetryConfigurator()._configure()
96+
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
97+
self.assertTrue(isinstance(sampler, sampling._AlwaysOn))
7298

7399
@mock.patch.dict("os.environ", {}, clear=True)
74100
def test_load_instrumentor_call_with_default_kwargs_for_SystemMetricsInstrumentor(self):
@@ -517,7 +543,7 @@ def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock, ge
517543
@mock.patch("opentelemetry.trace.get_tracer_provider")
518544
def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_mock):
519545
get_config_mock.return_value = Config()
520-
sampler = sampling.ParentBasedTraceIdRatio(rate=1.0)
546+
sampler = DefaultSampler(1.0)
521547
get_tracer_provider_mock.return_value.sampler = sampler
522548
agent = mock.Mock()
523549
client = mock.Mock()
@@ -528,7 +554,10 @@ def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_
528554
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
529555
opamp_handler(agent, client, message)
530556

531-
self.assertEqual(sampler._root.rate, 0.5)
557+
self.assertIn(
558+
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=8, ratio=0.5}}",
559+
sampler.get_description(),
560+
)
532561

533562
client._update_remote_config_status.assert_called_once_with(
534563
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""

0 commit comments

Comments
 (0)