Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- `opentelemetry-sdk-extension-aws` Add caching, matching, and targets logic to complete AWS X-Ray Remote Sampler implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opentelemetry-sdk-extension-aws is released on its own and it has its own changelog in sdk-extension/opentelemetry-sdk-extension-aws

([#3366](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3761))

## Version 1.38.0/0.59b0 (2025-10-16)

### Fixed
Expand Down
23 changes: 23 additions & 0 deletions sdk-extension/opentelemetry-sdk-extension-aws/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@ populate `resource` attributes by creating a `TraceProvider` using the `AwsEc2Re
Refer to each detectors' docstring to determine any possible requirements for that
detector.


Usage (AWS X-Ray Remote Sampler)
--------------------------------

Use the provided AWS X-Ray Remote Sampler by setting this sampler in your instrumented application:

.. code-block:: python

from opentelemetry.sdk.extension.aws.trace.sampler import AwsXRayRemoteSampler
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.util.types import Attributes

resource = Resource.create(attributes={
ResourceAttributes.SERVICE_NAME: "myService",
ResourceAttributes.CLOUD_PLATFORM: "aws_ec2",
})
xraySampler = AwsXRayRemoteSampler(resource=resource, polling_interval=300)
trace.set_tracer_provider(TracerProvider(sampler=xraySampler))


References
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ aws_eks = "opentelemetry.sdk.extension.aws.resource.eks:AwsEksResourceDetector"
aws_elastic_beanstalk = "opentelemetry.sdk.extension.aws.resource.beanstalk:AwsBeanstalkResourceDetector"
aws_lambda = "opentelemetry.sdk.extension.aws.resource._lambda:AwsLambdaResourceDetector"

# TODO: Uncomment this when Sampler implementation is complete
# [project.entry-points.opentelemetry_sampler]
# aws_xray_remote_sampler = "opentelemetry.sdk.extension.aws.trace.sampler:AwsXRayRemoteSampler"
[project.entry-points.opentelemetry_sampler]
aws_xray_remote_sampler = "opentelemetry.sdk.extension.aws.trace.sampler:AwsXRayRemoteSampler"

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/sdk-extension/opentelemetry-sdk-extension-aws"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# pylint: disable=no-name-in-module
from opentelemetry.sdk.extension.aws.trace.sampler.aws_xray_remote_sampler import (
_AwsXRayRemoteSampler,
AwsXRayRemoteSampler,
)

__all__ = ["_AwsXRayRemoteSampler"]
__all__ = ["AwsXRayRemoteSampler"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import Sequence

# pylint: disable=no-name-in-module
from opentelemetry.context import Context
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock
from opentelemetry.sdk.extension.aws.trace.sampler._rate_limiting_sampler import (
_RateLimitingSampler,
)
from opentelemetry.sdk.trace.sampling import (
Decision,
Sampler,
SamplingResult,
TraceIdRatioBased,
)
from opentelemetry.trace import Link, SpanKind
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes


class _FallbackSampler(Sampler):
def __init__(self, clock: _Clock):
self.__rate_limiting_sampler = _RateLimitingSampler(1, clock)
self.__fixed_rate_sampler = TraceIdRatioBased(0.05)

def should_sample(
self,
parent_context: Context | None,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links: Sequence["Link"] | None = None,
trace_state: TraceState | None = None,
) -> "SamplingResult":
sampling_result = self.__rate_limiting_sampler.should_sample(
parent_context,
trace_id,
name,
kind=kind,
attributes=attributes,
links=links,
trace_state=trace_state,
)
if sampling_result.decision is not Decision.DROP:
return sampling_result
return self.__fixed_rate_sampler.should_sample(
parent_context,
trace_id,
name,
kind=kind,
attributes=attributes,
links=links,
trace_state=trace_state,
)

# pylint: disable=no-self-use
def get_description(self) -> str:
description = "FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}"
return description
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import re

from opentelemetry.semconv.resource import CloudPlatformValues
from opentelemetry.util.types import Attributes, AttributeValue

cloud_platform_mapping = {
CloudPlatformValues.AWS_LAMBDA.value: "AWS::Lambda::Function",
CloudPlatformValues.AWS_ELASTIC_BEANSTALK.value: "AWS::ElasticBeanstalk::Environment",
CloudPlatformValues.AWS_EC2.value: "AWS::EC2::Instance",
CloudPlatformValues.AWS_ECS.value: "AWS::ECS::Container",
CloudPlatformValues.AWS_EKS.value: "AWS::EKS::Container",
}


class _Matcher:
@staticmethod
def wild_card_match(
text: AttributeValue | None = None, pattern: str | None = None
) -> bool:
if pattern == "*":
return True
if not isinstance(text, str) or pattern is None:
return False
if len(pattern) == 0:
return len(text) == 0
for char in pattern:
if char in ("*", "?"):
return (
re.fullmatch(_Matcher.to_regex_pattern(pattern), text)
is not None
)
return pattern == text

@staticmethod
def to_regex_pattern(rule_pattern: str) -> str:
token_start = -1
regex_pattern = ""
for index, char in enumerate(rule_pattern):
char = rule_pattern[index]
if char in ("*", "?"):
if token_start != -1:
regex_pattern += re.escape(rule_pattern[token_start:index])
token_start = -1
if char == "*":
regex_pattern += ".*"
else:
regex_pattern += "."
else:
if token_start == -1:
token_start = index
if token_start != -1:
regex_pattern += re.escape(rule_pattern[token_start:])
return regex_pattern

@staticmethod
def attribute_match(
attributes: Attributes | None = None,
rule_attributes: dict[str, str] | None = None,
) -> bool:
if rule_attributes is None or len(rule_attributes) == 0:
return True
if (
attributes is None
or len(attributes) == 0
or len(rule_attributes) > len(attributes)
):
return False

matched_count = 0
for key, val in attributes.items():
text_to_match = val
pattern = rule_attributes.get(key, None)
if pattern is None:
continue
if _Matcher.wild_card_match(text_to_match, pattern):
matched_count += 1
return matched_count == len(rule_attributes)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from decimal import Decimal
from threading import Lock

# pylint: disable=no-name-in-module
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock


class _RateLimiter:
def __init__(self, max_balance_in_seconds: int, quota: int, clock: _Clock):
# max_balance_in_seconds is usually 1
# pylint: disable=invalid-name
self.MAX_BALANCE_MILLIS = Decimal(max_balance_in_seconds * 1000.0)
self._clock = clock

self._quota = Decimal(quota)
self.__wallet_floor_millis = Decimal(
self._clock.now().timestamp() * 1000.0
)
# current "wallet_balance" would be ceiling - floor

self.__lock = Lock()

def try_spend(self, cost: float) -> bool:
if self._quota == 0:
return False

quota_per_millis = self._quota / Decimal(1000.0)

# assume divide by zero not possible
cost_in_millis = Decimal(cost) / quota_per_millis

with self.__lock:
wallet_ceiling_millis = Decimal(
self._clock.now().timestamp() * 1000.0
)
current_balance_millis = (
wallet_ceiling_millis - self.__wallet_floor_millis
)
current_balance_millis = min(
current_balance_millis, self.MAX_BALANCE_MILLIS
)
pending_remaining_balance_millis = (
current_balance_millis - cost_in_millis
)
if pending_remaining_balance_millis >= 0:
self.__wallet_floor_millis = (
wallet_ceiling_millis - pending_remaining_balance_millis
)
return True
# No changes to the wallet state
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from typing import Sequence

# pylint: disable=no-name-in-module
from opentelemetry.context import Context
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock
from opentelemetry.sdk.extension.aws.trace.sampler._rate_limiter import (
_RateLimiter,
)
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
from opentelemetry.trace import Link, SpanKind
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes


class _RateLimitingSampler(Sampler):
def __init__(self, quota: int, clock: _Clock):
self.__quota = quota
self.__reservoir = _RateLimiter(1, quota, clock)

def should_sample(
self,
parent_context: Context | None,
trace_id: int,
name: str,
kind: SpanKind | None = None,
attributes: Attributes | None = None,
links: Sequence["Link"] | None = None,
trace_state: TraceState | None = None,
) -> "SamplingResult":
if self.__reservoir.try_spend(1):
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
return SamplingResult(
decision=Decision.DROP,
attributes=attributes,
trace_state=trace_state,
)

def get_description(self) -> str:
description = f"RateLimitingSampler{{rate limiting sampling with sampling config of {self.__quota} req/sec and 0% of additional requests}}"
return description
Loading