Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-sdk-extension-aws` Add caching, matching, and targets logic to complete AWS X-Ray Remote Sampler implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opentelemetry-sdk-extension-aws is released on its own and it has its own changelog in sdk-extension/opentelemetry-sdk-extension-aws

([#3366](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3761))

## Version 1.37.0/0.58b0 (2025-09-11)

### Fixed
Expand Down
23 changes: 23 additions & 0 deletions sdk-extension/opentelemetry-sdk-extension-aws/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ populate `resource` attributes by creating a `TraceProvider` using the `AwsEc2Re
Refer to each detectors' docstring to determine any possible requirements for that
detector.


Usage (AWS X-Ray Remote Sampler)
--------------------------------

Use the provided AWS X-Ray Remote Sampler by setting this sampler in your instrumented application:

.. code-block:: python
from opentelemetry.sdk.extension.aws.trace.sampler import AwsXRayRemoteSampler
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.util.types import Attributes
resource = Resource.create(attributes={
ResourceAttributes.SERVICE_NAME: "myService",
ResourceAttributes.CLOUD_PLATFORM: "aws_ec2",
})
xraySampler = AwsXRayRemoteSampler(resource=resource, polling_interval=300)
trace.set_tracer_provider(TracerProvider(sampler=xraySampler))
References
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ aws_eks = "opentelemetry.sdk.extension.aws.resource.eks:AwsEksResourceDetector"
aws_elastic_beanstalk = "opentelemetry.sdk.extension.aws.resource.beanstalk:AwsBeanstalkResourceDetector"
aws_lambda = "opentelemetry.sdk.extension.aws.resource._lambda:AwsLambdaResourceDetector"

# TODO: Uncomment this when Sampler implementation is complete
# [project.entry-points.opentelemetry_sampler]
# aws_xray_remote_sampler = "opentelemetry.sdk.extension.aws.trace.sampler:AwsXRayRemoteSampler"
[project.entry-points.opentelemetry_sampler]
aws_xray_remote_sampler = "opentelemetry.sdk.extension.aws.trace.sampler:AwsXRayRemoteSampler"

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/sdk-extension/opentelemetry-sdk-extension-aws"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# pylint: disable=no-name-in-module
from opentelemetry.sdk.extension.aws.trace.sampler.aws_xray_remote_sampler import (
_AwsXRayRemoteSampler,
AwsXRayRemoteSampler,
)

__all__ = ["_AwsXRayRemoteSampler"]
__all__ = ["AwsXRayRemoteSampler"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import Sequence

# pylint: disable=no-name-in-module
from opentelemetry.context import Context
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock
from opentelemetry.sdk.extension.aws.trace.sampler._rate_limiting_sampler import (
_RateLimitingSampler,
)
from opentelemetry.sdk.trace.sampling import (
Decision,
Sampler,
SamplingResult,
TraceIdRatioBased,
)
from opentelemetry.trace import Link, SpanKind
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes


class _FallbackSampler(Sampler):
def __init__(self, clock: _Clock):
self.__rate_limiting_sampler = _RateLimitingSampler(1, clock)
self.__fixed_rate_sampler = TraceIdRatioBased(0.05)

def should_sample(
self,
parent_context: Context | None,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links: Sequence["Link"] | None = None,
trace_state: TraceState | None = None,
) -> "SamplingResult":
sampling_result = self.__rate_limiting_sampler.should_sample(
parent_context,
trace_id,
name,
kind=kind,
attributes=attributes,
links=links,
trace_state=trace_state,
)
if sampling_result.decision is not Decision.DROP:
return sampling_result
return self.__fixed_rate_sampler.should_sample(
parent_context,
trace_id,
name,
kind=kind,
attributes=attributes,
links=links,
trace_state=trace_state,
)

# pylint: disable=no-self-use
def get_description(self) -> str:
description = "FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}"
return description
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import re

from opentelemetry.semconv.resource import CloudPlatformValues
from opentelemetry.util.types import Attributes, AttributeValue

cloud_platform_mapping = {
CloudPlatformValues.AWS_LAMBDA.value: "AWS::Lambda::Function",
CloudPlatformValues.AWS_ELASTIC_BEANSTALK.value: "AWS::ElasticBeanstalk::Environment",
CloudPlatformValues.AWS_EC2.value: "AWS::EC2::Instance",
CloudPlatformValues.AWS_ECS.value: "AWS::ECS::Container",
CloudPlatformValues.AWS_EKS.value: "AWS::EKS::Container",
}


class _Matcher:
@staticmethod
def wild_card_match(
text: AttributeValue | None = None, pattern: str | None = None
) -> bool:
if pattern == "*":
return True
if not isinstance(text, str) or pattern is None:
return False
if len(pattern) == 0:
return len(text) == 0
for char in pattern:
if char in ("*", "?"):
return (
re.fullmatch(_Matcher.to_regex_pattern(pattern), text)
is not None
)
return pattern == text

@staticmethod
def to_regex_pattern(rule_pattern: str) -> str:
token_start = -1
regex_pattern = ""
for index, char in enumerate(rule_pattern):
char = rule_pattern[index]
if char in ("*", "?"):
if token_start != -1:
regex_pattern += re.escape(rule_pattern[token_start:index])
token_start = -1
if char == "*":
regex_pattern += ".*"
else:
regex_pattern += "."
else:
if token_start == -1:
token_start = index
if token_start != -1:
regex_pattern += re.escape(rule_pattern[token_start:])
return regex_pattern

@staticmethod
def attribute_match(
attributes: Attributes | None = None,
rule_attributes: dict[str, str] | None = None,
) -> bool:
if rule_attributes is None or len(rule_attributes) == 0:
return True
if (
attributes is None
or len(attributes) == 0
or len(rule_attributes) > len(attributes)
):
return False

matched_count = 0
for key, val in attributes.items():
text_to_match = val
pattern = rule_attributes.get(key, None)
if pattern is None:
continue
if _Matcher.wild_card_match(text_to_match, pattern):
matched_count += 1
return matched_count == len(rule_attributes)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from decimal import Decimal
from threading import Lock

# pylint: disable=no-name-in-module
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock


class _RateLimiter:
def __init__(self, max_balance_in_seconds: int, quota: int, clock: _Clock):
# max_balance_in_seconds is usually 1
# pylint: disable=invalid-name
self.MAX_BALANCE_MILLIS = Decimal(max_balance_in_seconds * 1000.0)
self._clock = clock

self._quota = Decimal(quota)
self.__wallet_floor_millis = Decimal(
self._clock.now().timestamp() * 1000.0
)
# current "wallet_balance" would be ceiling - floor

self.__lock = Lock()

def try_spend(self, cost: float) -> bool:
if self._quota == 0:
return False

quota_per_millis = self._quota / Decimal(1000.0)

# assume divide by zero not possible
cost_in_millis = Decimal(cost) / quota_per_millis

with self.__lock:
wallet_ceiling_millis = Decimal(
self._clock.now().timestamp() * 1000.0
)
current_balance_millis = (
wallet_ceiling_millis - self.__wallet_floor_millis
)
current_balance_millis = min(
current_balance_millis, self.MAX_BALANCE_MILLIS
)
pending_remaining_balance_millis = (
current_balance_millis - cost_in_millis
)
if pending_remaining_balance_millis >= 0:
self.__wallet_floor_millis = (
wallet_ceiling_millis - pending_remaining_balance_millis
)
return True
# No changes to the wallet state
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import Sequence

# pylint: disable=no-name-in-module
from opentelemetry.context import Context
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock
from opentelemetry.sdk.extension.aws.trace.sampler._rate_limiter import (
_RateLimiter,
)
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
from opentelemetry.trace import Link, SpanKind
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes


class _RateLimitingSampler(Sampler):
def __init__(self, quota: int, clock: _Clock):
self.__quota = quota
self.__reservoir = _RateLimiter(1, quota, clock)

def should_sample(
self,
parent_context: Context | None,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links: Sequence["Link"] | None = None,
trace_state: TraceState | None = None,
) -> "SamplingResult":
if self.__reservoir.try_spend(1):
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
return SamplingResult(
decision=Decision.DROP,
attributes=attributes,
trace_state=trace_state,
)

def get_description(self) -> str:
description = f"RateLimitingSampler{{rate limiting sampling with sampling config of {self.__quota} req/sec and 0% of additional requests}}"
return description
Loading