Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/elasticotel/distro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
OTEL_EXPERIMENTAL_RESOURCE_DETECTORS,
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_TRACES_SAMPLER,
OTEL_TRACES_SAMPLER_ARG,
)
from opentelemetry.sdk.resources import OTELResourceDetector
from opentelemetry.util._importlib_metadata import EntryPoint
Expand All @@ -58,7 +56,8 @@
ELASTIC_OTEL_SYSTEM_METRICS_ENABLED,
)
from elasticotel.distro.resource_detectors import get_cloud_resource_detectors
from elasticotel.distro.config import opamp_handler, DEFAULT_SAMPLING_RATE, _initialize_config
from elasticotel.sdk.sampler import dynamic_composite_parent_threshold_traceid_ratio_based_sampler
from elasticotel.distro.config import opamp_handler, _initialize_config


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,6 +88,9 @@ def _configure(self, **kwargs):
HTTPOTLPMetricExporter: otlp_http_exporter_options,
HTTPOTLPSpanExporter: otlp_http_exporter_options,
}

kwargs["sampler"] = dynamic_composite_parent_threshold_traceid_ratio_based_sampler()

super()._configure(**kwargs)

# set our local config based on environment variables
Expand Down Expand Up @@ -159,8 +161,6 @@ def _configure(self, **kwargs):
os.environ.setdefault(OTEL_METRICS_EXEMPLAR_FILTER, "always_off")
# preference to use DELTA temporality as we can handle only this kind of Histograms
os.environ.setdefault(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, "DELTA")
os.environ.setdefault(OTEL_TRACES_SAMPLER, "parentbased_traceidratio")
os.environ.setdefault(OTEL_TRACES_SAMPLER_ARG, str(DEFAULT_SAMPLING_RATE))
Comment on lines -162 to -163
Copy link
Member

@xrmx xrmx Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also remove these from the documentation. Need to check priority of OTEL_TRACES_SAMPLER env var against the configure parameter. An option is to provide our own entry points and just switch to it there?


base_resource_detectors = [
"process_runtime",
Expand Down
17 changes: 6 additions & 11 deletions src/elasticotel/distro/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dataclasses import dataclass

from elasticotel.distro.sanitization import _sanitize_headers_env_vars
from elasticotel.sdk.sampler import DynamicCompositeParentThresholdTraceIdRatioBasedSampler
from opentelemetry import trace
from opentelemetry._opamp import messages
from opentelemetry._opamp.agent import OpAMPAgent
Expand All @@ -31,7 +32,6 @@
)
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2
from opentelemetry.sdk.environment_variables import OTEL_LOG_LEVEL, OTEL_TRACES_SAMPLER_ARG
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,19 +161,14 @@ def _handle_sampling_rate(remote_config) -> ConfigUpdate:
return ConfigUpdate()

# FIXME: this needs to be updated for the consistent probability samplers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# FIXME: this needs to be updated for the consistent probability samplers

if not isinstance(sampler, ParentBasedTraceIdRatio):
if not isinstance(sampler, DynamicCompositeParentThresholdTraceIdRatioBasedSampler):
logger.warning("Sampler %s is not supported, not applying sampling_rate.", type(sampler))
return ConfigUpdate()

# since sampler is parent based we need to update its root sampler
root_sampler = sampler._root # type: ignore[reportAttributeAccessIssue]
if root_sampler.rate != sampling_rate: # type: ignore[reportAttributeAccessIssue]
# we don't have a proper way to update it :)
root_sampler._rate = sampling_rate # type: ignore[reportAttributeAccessIssue]
root_sampler._bound = root_sampler.get_bound_for_rate(root_sampler._rate) # type: ignore[reportAttributeAccessIssue]
logger.debug("Updated sampler rate to %s", sampling_rate)
if _config:
_config.sampling_rate.update(value=config_sampling_rate)
sampler.set_ratio(sampling_rate)
logger.debug("Updated sampler rate to %s", sampling_rate)
if _config:
_config.sampling_rate.update(value=config_sampling_rate)
return ConfigUpdate()


Expand Down
79 changes: 79 additions & 0 deletions src/elasticotel/sdk/sampler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import Sequence

from opentelemetry.context import Context
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult
from opentelemetry.trace import Link, SpanKind, TraceState
from opentelemetry.sdk.trace._sampling_experimental import (
composite_sampler,
composable_parent_threshold,
composable_traceid_ratio_based,
)
from opentelemetry.util.types import Attributes


class DynamicCompositeParentThresholdTraceIdRatioBasedSampler(Sampler):
def __init__(self, ratio: float = 1.0):
self._delegate = _new_sampler(ratio)

def should_sample(
self,
parent_context: Context | None,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links: Sequence[Link] | None = None,
trace_state: TraceState | None = None,
) -> SamplingResult:
return self._delegate.should_sample(
parent_context,
trace_id,
name,
kind,
attributes,
links,
trace_state,
)

def set_ratio(self, ratio: float):
self._delegate = _new_sampler(ratio)

def get_description(self) -> str:
return self._delegate.get_description()


def _new_sampler(ratio: float):
return composite_sampler(composable_parent_threshold(composable_traceid_ratio_based(ratio)))


def dynamic_composite_parent_threshold_traceid_ratio_based_sampler(ratio: float = 1.0) -> Sampler:
"""Returns a new DynamicCompositeParentThresholdTraceIdRatioBasedSampler.

This sampler behaves like ParentBasedTraceIdRatio, but the sampling rate can be changed
at runtime.

Args:
ratio: The sampling ratio to use for root spans. Must be between 0.0 and 1.0.

Returns:
A new DynamicCompositeParentThresholdTraceIdRatioBasedSampler.
"""
return DynamicCompositeParentThresholdTraceIdRatioBasedSampler(ratio)
25 changes: 10 additions & 15 deletions tests/distro/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from elasticotel.distro import ElasticOpenTelemetryConfigurator, ElasticOpenTelemetryDistro, logger as distro_logger
from elasticotel.distro.config import opamp_handler, logger as config_logger, Config
from elasticotel.distro.environment_variables import ELASTIC_OTEL_OPAMP_ENDPOINT, ELASTIC_OTEL_SYSTEM_METRICS_ENABLED
from elasticotel.sdk.sampler import DynamicCompositeParentThresholdTraceIdRatioBasedSampler
from opentelemetry.environment_variables import (
OTEL_LOGS_EXPORTER,
OTEL_METRICS_EXPORTER,
Expand All @@ -32,9 +33,8 @@
OTEL_EXPERIMENTAL_RESOURCE_DETECTORS,
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_TRACES_SAMPLER,
OTEL_TRACES_SAMPLER_ARG,
)
from opentelemetry import trace
from opentelemetry.sdk.trace import sampling
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2

Expand All @@ -54,21 +54,13 @@ def test_default_configuration(self):
)
self.assertEqual("always_off", os.environ.get(OTEL_METRICS_EXEMPLAR_FILTER))
self.assertEqual("DELTA", os.environ.get(OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE))
self.assertEqual("parentbased_traceidratio", os.environ.get(OTEL_TRACES_SAMPLER))
self.assertEqual("1.0", os.environ.get(OTEL_TRACES_SAMPLER_ARG))

@mock.patch.dict("os.environ", {}, clear=True)
def test_sampler_configuration(self):
distro = ElasticOpenTelemetryDistro()
distro._configure()
parent_sampler = sampling._get_from_env_or_default()

assert isinstance(parent_sampler, sampling.ParentBasedTraceIdRatio)

sampler = parent_sampler._root
ElasticOpenTelemetryConfigurator()._configure()

assert isinstance(sampler, sampling.TraceIdRatioBased)
assert sampler.rate == 1.0
sampler = getattr(trace.get_tracer_provider(), "sampler", None)
assert isinstance(sampler, DynamicCompositeParentThresholdTraceIdRatioBasedSampler)

@mock.patch.dict("os.environ", {}, clear=True)
def test_load_instrumentor_call_with_default_kwargs_for_SystemMetricsInstrumentor(self):
Expand Down Expand Up @@ -488,7 +480,7 @@ def test_warns_if_logging_level_does_not_match_our_map(self, get_logger_mock, ge
@mock.patch("opentelemetry.trace.get_tracer_provider")
def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_mock):
get_config_mock.return_value = Config()
sampler = sampling.ParentBasedTraceIdRatio(rate=1.0)
sampler = DynamicCompositeParentThresholdTraceIdRatioBasedSampler(1.0)
get_tracer_provider_mock.return_value.sampler = sampler
agent = mock.Mock()
client = mock.Mock()
Expand All @@ -499,7 +491,10 @@ def test_sets_matching_sampling_rate(self, get_tracer_provider_mock, get_config_
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
opamp_handler(agent, client, message)

self.assertEqual(sampler._root.rate, 0.5)
self.assertIn(
"ComposableParentThreshold{root=ComposableTraceIDRatioBased{threshold=8, ratio=0.5}}",
sampler.get_description(),
)

client._update_remote_config_status.assert_called_once_with(
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_APPLIED, error_message=""
Expand Down