From 303ba6f8b0e8801fd55a16fea0a236de6385a0a3 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 10 Oct 2025 15:44:45 +0900 Subject: [PATCH 1/2] Wire up composite sampler --- src/elasticotel/distro/__init__.py | 8 ++-- src/elasticotel/distro/config.py | 17 +++---- src/elasticotel/sdk/sampler/__init__.py | 63 +++++++++++++++++++++++++ tests/distro/test_distro.py | 25 ++++------ 4 files changed, 83 insertions(+), 30 deletions(-) create mode 100644 src/elasticotel/sdk/sampler/__init__.py diff --git a/src/elasticotel/distro/__init__.py b/src/elasticotel/distro/__init__.py index 90169f7..ec9f31d 100644 --- a/src/elasticotel/distro/__init__.py +++ b/src/elasticotel/distro/__init__.py @@ -43,8 +43,6 @@ OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, OTEL_EXPORTER_OTLP_PROTOCOL, - OTEL_TRACES_SAMPLER, - OTEL_TRACES_SAMPLER_ARG, ) from opentelemetry.sdk.resources import OTELResourceDetector from opentelemetry.util._importlib_metadata import EntryPoint @@ -58,6 +56,7 @@ ELASTIC_OTEL_SYSTEM_METRICS_ENABLED, ) from elasticotel.distro.resource_detectors import get_cloud_resource_detectors +from elasticotel.sdk.sampler import dynamic_composite_parent_threshold_traceid_ratio_based_sampler from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE, _initialize_config @@ -89,6 +88,9 @@ def _configure(self, **kwargs): HTTPOTLPMetricExporter: otlp_http_exporter_options, HTTPOTLPSpanExporter: otlp_http_exporter_options, } + + kwargs["sampler"] = dynamic_composite_parent_threshold_traceid_ratio_based_sampler() + super()._configure(**kwargs) # set our local config based on environment variables @@ -159,8 +161,6 @@ def _configure(self, **kwargs): os.environ.setdefault(OTEL_METRICS_EXEMPLAR_FILTER, "always_off") # preference to use DELTA temporality as we can handle only this kind of Histograms os.environ.setdefault(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "DELTA") - os.environ.setdefault(OTEL_TRACES_SAMPLER, "parentbased_traceidratio") - os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, str(DEFAULT_SAMPLING_RATE)) base_resource_detectors = [ "process_runtime", diff --git a/src/elasticotel/distro/config.py b/src/elasticotel/distro/config.py index 28bc61d..38c5c62 100644 --- a/src/elasticotel/distro/config.py +++ b/src/elasticotel/distro/config.py @@ -21,6 +21,7 @@ from dataclasses import dataclass from elasticotel.distro.sanitization import _sanitize_headers_env_vars +from elasticotel.sdk.sampler import DynamicCompositeParentThresholdTraceIdRatioBasedSampler from opentelemetry import trace from opentelemetry._opamp import messages from opentelemetry._opamp.agent import OpAMPAgent @@ -31,7 +32,6 @@ ) from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2 from opentelemetry.sdk.environment_variables import OTEL_LOG_LEVEL, OTEL_TRACES_SAMPLER_ARG -from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio logger = logging.getLogger(__name__) @@ -161,19 +161,14 @@ def _handle_sampling_rate(remote_config) -> ConfigUpdate: return ConfigUpdate() # FIXME: this needs to be updated for the consistent probability samplers - if not isinstance(sampler, ParentBasedTraceIdRatio): + if not isinstance(sampler, DynamicCompositeParentThresholdTraceIdRatioBasedSampler): logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler)) return ConfigUpdate() - # since sampler is parent based we need to update its root sampler - root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue] - if root_sampler.rate != sampling_rate: # type: ignore[reportAttributeAccessIssue] - # we don't have a proper way to update it :) - root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue] - root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue] - logger.debug("Updated sampler rate to %s", sampling_rate) - if _config: - _config.sampling_rate.update(value=config_sampling_rate) + sampler.set_ratio(sampling_rate) + logger.debug("Updated sampler rate to %s", sampling_rate) + if _config: + _config.sampling_rate.update(value=config_sampling_rate) return ConfigUpdate() diff --git a/src/elasticotel/sdk/sampler/__init__.py b/src/elasticotel/sdk/sampler/__init__.py new file mode 100644 index 0000000..9bef20a --- /dev/null +++ b/src/elasticotel/sdk/sampler/__init__.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from typing import Sequence + +from opentelemetry.context import Context +from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult +from opentelemetry.trace import Link, SpanKind, TraceState +from opentelemetry.sdk.trace._sampling_experimental import ( + composite_sampler, + composable_parent_threshold, + composable_traceid_ratio_based, +) +from opentelemetry.util.types import Attributes + + +class DynamicCompositeParentThresholdTraceIdRatioBasedSampler(Sampler): + def __init__(self, ratio: float = 1.0): + self._delegate = _new_sampler(ratio) + + def should_sample( + self, + parent_context: Context | None, + trace_id: int, + name: str, + kind: SpanKind | None = None, + attributes: Attributes | None = None, + links: Sequence[Link] | None = None, + trace_state: TraceState | None = None, + ) -> SamplingResult: + return self._delegate.should_sample( + parent_context, + trace_id, + name, + kind, + attributes, + links, + trace_state, + ) + + def set_ratio(self, ratio: float): + self._delegate = _new_sampler(ratio) + + def get_description(self) -> str: + return self._delegate.get_description() + + +def _new_sampler(ratio: float): + return composite_sampler(composable_parent_threshold(composable_traceid_ratio_based(ratio))) + + +def dynamic_composite_parent_threshold_traceid_ratio_based_sampler(ratio: float = 1.0) -> Sampler: + """Returns a new DynamicCompositeParentThresholdTraceIdRatioBasedSampler. + + This sampler behaves like ParentBasedTraceIdRatio, but the sampling rate can be changed + at runtime. + + Args: + ratio: The sampling ratio to use for root spans. Must be between 0.0 and 1.0. + + Returns: + A new DynamicCompositeParentThresholdTraceIdRatioBasedSampler. + """ + return DynamicCompositeParentThresholdTraceIdRatioBasedSampler(ratio) diff --git a/tests/distro/test_distro.py b/tests/distro/test_distro.py index f9801c5..773099a 100644 --- a/tests/distro/test_distro.py +++ b/tests/distro/test_distro.py @@ -22,6 +22,7 @@ from elasticotel.distro import ElasticOpenTelemetryConfigurator, ElasticOpenTelemetryDistro, logger as distro_logger from elasticotel.distro.config import opamp_handler, logger as config_logger, Config from elasticotel.distro.environment_variables import ELASTIC_OTEL_OPAMP_ENDPOINT, ELASTIC_OTEL_SYSTEM_METRICS_ENABLED +from elasticotel.sdk.sampler import DynamicCompositeParentThresholdTraceIdRatioBasedSampler from opentelemetry.environment_variables import ( OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, @@ -32,9 +33,8 @@ OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, OTEL_EXPORTER_OTLP_PROTOCOL, - OTEL_TRACES_SAMPLER, - OTEL_TRACES_SAMPLER_ARG, ) +from opentelemetry import trace from opentelemetry.sdk.trace import sampling from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2 @@ -54,21 +54,13 @@ def test_default_configuration(self): ) self.assertEqual("always_off", os.environ.get(OTEL_METRICS_EXEMPLAR_FILTER)) self.assertEqual("DELTA", os.environ.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE)) - self.assertEqual("parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER)) - self.assertEqual("1.0", os.environ.get(OTEL_TRACES_SAMPLER_ARG)) @mock.patch.dict("os.environ", {}, clear=True) def test_sampler_configuration(self): - distro = ElasticOpenTelemetryDistro() - distro._configure() - parent_sampler = sampling._get_from_env_or_default() - - assert isinstance(parent_sampler, sampling.ParentBasedTraceIdRatio) - - sampler = parent_sampler._root + ElasticOpenTelemetryConfigurator()._configure() - assert isinstance(sampler, sampling.TraceIdRatioBased) - assert sampler.rate == 1.0 + sampler = getattr(trace.get_tracer_provider(), "sampler", None) + assert isinstance(sampler, DynamicCompositeParentThresholdTraceIdRatioBasedSampler) @mock.patch.dict("os.environ", {}, clear=True) def test_load_instrumentor_call_with_default_kwargs_for_SystemMetricsInstrumentor(self): @@ -488,7 +480,7 @@ def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock, ge @mock.patch("opentelemetry.trace.get_tracer_provider") def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_mock): get_config_mock.return_value = Config() - sampler = sampling.ParentBasedTraceIdRatio(rate=1.0) + sampler = DynamicCompositeParentThresholdTraceIdRatioBasedSampler(1.0) get_tracer_provider_mock.return_value.sampler = sampler agent = mock.Mock() client = mock.Mock() @@ -499,7 +491,10 @@ def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_ message = opamp_pb2.ServerToAgent(remote_config=remote_config) opamp_handler(agent, client, message) - self.assertEqual(sampler._root.rate, 0.5) + self.assertIn( + "ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=8, ratio=0.5}}", + sampler.get_description(), + ) client._update_remote_config_status.assert_called_once_with( remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message="" From aad3847c56deddfdf4cb4d49241150d7a4c0481e Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 10 Oct 2025 15:49:09 +0900 Subject: [PATCH 2/2] Lint --- src/elasticotel/distro/__init__.py | 2 +- src/elasticotel/sdk/sampler/__init__.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/elasticotel/distro/__init__.py b/src/elasticotel/distro/__init__.py index ec9f31d..5190238 100644 --- a/src/elasticotel/distro/__init__.py +++ b/src/elasticotel/distro/__init__.py @@ -57,7 +57,7 @@ ) from elasticotel.distro.resource_detectors import get_cloud_resource_detectors from elasticotel.sdk.sampler import dynamic_composite_parent_threshold_traceid_ratio_based_sampler -from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE, _initialize_config +from elasticotel.distro.config import opamp_handler, _initialize_config logger = logging.getLogger(__name__) diff --git a/src/elasticotel/sdk/sampler/__init__.py b/src/elasticotel/sdk/sampler/__init__.py index 9bef20a..3998782 100644 --- a/src/elasticotel/sdk/sampler/__init__.py +++ b/src/elasticotel/sdk/sampler/__init__.py @@ -1,3 +1,19 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import annotations from typing import Sequence