Skip to content

Commit c3dc5cf

Browse files
committed
remote sampling - full implementation
1 parent ad29af3 commit c3dc5cf

37 files changed

+4064
-2
lines changed

.github/component_owners.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ components:
3636
- NathanielRN
3737
- Kausik-A
3838
- srprash
39+
- jj22ee
3940

4041
instrumentation/opentelemetry-instrumentation-tortoiseorm:
4142
- tonybaloney

sdk-extension/opentelemetry-sdk-extension-aws/README.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,30 @@ populate `resource` attributes by creating a `TraceProvider` using the `AwsEc2Re
7474
Refer to each detectors' docstring to determine any possible requirements for that
7575
detector.
7676

77+
78+
Usage (AWS X-Ray Remote Sampler)
79+
----------------------------
80+
81+
82+
Use the provided AWS X-Ray Remote Sampler by setting this sampler in your instrumented application:
83+
84+
.. code-block:: python
85+
86+
from opentelemetry.sdk.extension.aws.trace.sampler import AwsXRayRemoteSampler
87+
from opentelemetry import trace
88+
from opentelemetry.sdk.resources import Resource
89+
from opentelemetry.sdk.trace import TracerProvider
90+
from opentelemetry.semconv.resource import ResourceAttributes
91+
from opentelemetry.util.types import Attributes
92+
93+
resource = Resource.create(attributes={
94+
ResourceAttributes.SERVICE_NAME: "myService",
95+
ResourceAttributes.CLOUD_PLATFORM: "aws_ec2",
96+
})
97+
xraySampler = AwsXRayRemoteSampler(resource=resource, polling_interval=300)
98+
trace.set_tracer_provider(TracerProvider(sampler=xraySampler))
99+
100+
77101
References
78102
----------
79103

sdk-extension/opentelemetry-sdk-extension-aws/pyproject.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ classifiers = [
2626
"Programming Language :: Python :: 3.13",
2727
]
2828
dependencies = [
29-
"opentelemetry-sdk ~= 1.12",
29+
"opentelemetry-api ~= 1.23",
30+
"opentelemetry-sdk ~= 1.23",
31+
"opentelemetry-instrumentation ~= 0.44b0",
32+
"opentelemetry-semantic-conventions ~= 0.44b0",
3033
]
3134

3235
[project.entry-points.opentelemetry_id_generator]
@@ -39,6 +42,9 @@ aws_eks = "opentelemetry.sdk.extension.aws.resource.eks:AwsEksResourceDetector"
3942
aws_elastic_beanstalk = "opentelemetry.sdk.extension.aws.resource.beanstalk:AwsBeanstalkResourceDetector"
4043
aws_lambda = "opentelemetry.sdk.extension.aws.resource._lambda:AwsLambdaResourceDetector"
4144

45+
[project.entry-points.opentelemetry_sampler]
46+
aws_xray_remote_sampler = "opentelemetry.sdk.extension.aws.trace.sampler:AwsXRayRemoteSampler"
47+
4248
[project.urls]
4349
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/sdk-extension/opentelemetry-sdk-extension-aws"
4450
Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# pylint: disable=no-name-in-module
16+
from opentelemetry.sdk.extension.aws.trace.sampler.aws_xray_remote_sampler import (
17+
AwsXRayRemoteSampler,
18+
)
19+
20+
__all__ = ["AwsXRayRemoteSampler"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Includes work from:
16+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
17+
# SPDX-License-Identifier: Apache-2.0
18+
19+
import json
20+
from logging import getLogger
21+
from typing import List, Optional
22+
23+
import requests
24+
25+
# pylint: disable=no-name-in-module
26+
from opentelemetry.instrumentation.utils import suppress_instrumentation
27+
from opentelemetry.sdk.extension.aws.trace.sampler._sampling_rule import (
28+
_SamplingRule,
29+
)
30+
from opentelemetry.sdk.extension.aws.trace.sampler._sampling_target import (
31+
_SamplingTargetResponse,
32+
)
33+
34+
_logger = getLogger(__name__)
35+
DEFAULT_SAMPLING_PROXY_ENDPOINT = "http://127.0.0.1:2000"
36+
37+
38+
class _AwsXRaySamplingClient:
39+
def __init__(
40+
self,
41+
endpoint: str = DEFAULT_SAMPLING_PROXY_ENDPOINT,
42+
log_level: Optional[str] = None,
43+
):
44+
# Override default log level
45+
if log_level is not None:
46+
_logger.setLevel(log_level)
47+
48+
self.__get_sampling_rules_endpoint = endpoint + "/GetSamplingRules"
49+
self.__get_sampling_targets_endpoint = endpoint + "/SamplingTargets"
50+
51+
self.__session = requests.Session()
52+
53+
def get_sampling_rules(self) -> List[_SamplingRule]:
54+
sampling_rules: List["_SamplingRule"] = []
55+
headers = {"content-type": "application/json"}
56+
57+
with suppress_instrumentation():
58+
try:
59+
xray_response = self.__session.post(
60+
url=self.__get_sampling_rules_endpoint,
61+
headers=headers,
62+
timeout=20,
63+
)
64+
sampling_rules_response = xray_response.json()
65+
if (
66+
sampling_rules_response is None
67+
or "SamplingRuleRecords" not in sampling_rules_response
68+
):
69+
_logger.error(
70+
"SamplingRuleRecords is missing in getSamplingRules response: %s",
71+
sampling_rules_response,
72+
)
73+
return []
74+
sampling_rules_records = sampling_rules_response[
75+
"SamplingRuleRecords"
76+
]
77+
for record in sampling_rules_records:
78+
if "SamplingRule" not in record:
79+
_logger.error(
80+
"SamplingRule is missing in SamplingRuleRecord"
81+
)
82+
else:
83+
sampling_rules.append(
84+
_SamplingRule(**record["SamplingRule"])
85+
)
86+
87+
except requests.exceptions.RequestException as req_err:
88+
_logger.error("Request error occurred: %s", req_err)
89+
except json.JSONDecodeError as json_err:
90+
_logger.error("Error in decoding JSON response: %s", json_err)
91+
# pylint: disable=broad-exception-caught
92+
except Exception as err:
93+
_logger.error(
94+
"Error occurred when attempting to fetch rules: %s", err
95+
)
96+
97+
return sampling_rules
98+
99+
def get_sampling_targets(
100+
self, statistics: List["dict[str, str | float | int]"]
101+
) -> _SamplingTargetResponse:
102+
sampling_targets_response = _SamplingTargetResponse(
103+
LastRuleModification=None,
104+
SamplingTargetDocuments=None,
105+
UnprocessedStatistics=None,
106+
)
107+
headers = {"content-type": "application/json"}
108+
109+
with suppress_instrumentation():
110+
try:
111+
xray_response = self.__session.post(
112+
url=self.__get_sampling_targets_endpoint,
113+
headers=headers,
114+
timeout=20,
115+
json={"SamplingStatisticsDocuments": statistics},
116+
)
117+
xray_response_json = xray_response.json()
118+
if (
119+
xray_response_json is None
120+
or "SamplingTargetDocuments" not in xray_response_json
121+
or "LastRuleModification" not in xray_response_json
122+
):
123+
_logger.debug(
124+
"getSamplingTargets response is invalid. Unable to update targets."
125+
)
126+
return sampling_targets_response
127+
128+
sampling_targets_response = _SamplingTargetResponse(
129+
**xray_response_json
130+
)
131+
except requests.exceptions.RequestException as req_err:
132+
_logger.debug("Request error occurred: %s", req_err)
133+
except json.JSONDecodeError as json_err:
134+
_logger.debug("Error in decoding JSON response: %s", json_err)
135+
# pylint: disable=broad-exception-caught
136+
except Exception as err:
137+
_logger.debug(
138+
"Error occurred when attempting to fetch targets: %s", err
139+
)
140+
141+
return sampling_targets_response
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Includes work from:
16+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
17+
# SPDX-License-Identifier: Apache-2.0
18+
19+
import datetime
20+
21+
22+
class _Clock:
23+
def __init__(self):
24+
self.__datetime = datetime.datetime
25+
26+
def now(self) -> datetime.datetime:
27+
return self.__datetime.now()
28+
29+
# pylint: disable=no-self-use
30+
def from_timestamp(self, timestamp: float) -> datetime.datetime:
31+
return datetime.datetime.fromtimestamp(timestamp)
32+
33+
def time_delta(self, seconds: float) -> datetime.timedelta:
34+
return datetime.timedelta(seconds=seconds)
35+
36+
def max(self) -> datetime.datetime:
37+
return datetime.datetime.max
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Includes work from:
16+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
17+
# SPDX-License-Identifier: Apache-2.0
18+
19+
from typing import Optional, Sequence
20+
21+
# pylint: disable=no-name-in-module
22+
from opentelemetry.context import Context
23+
from opentelemetry.sdk.extension.aws.trace.sampler._clock import _Clock
24+
from opentelemetry.sdk.extension.aws.trace.sampler._rate_limiting_sampler import (
25+
_RateLimitingSampler,
26+
)
27+
from opentelemetry.sdk.trace.sampling import (
28+
Decision,
29+
Sampler,
30+
SamplingResult,
31+
TraceIdRatioBased,
32+
)
33+
from opentelemetry.trace import Link, SpanKind
34+
from opentelemetry.trace.span import TraceState
35+
from opentelemetry.util.types import Attributes
36+
37+
38+
class _FallbackSampler(Sampler):
39+
def __init__(self, clock: _Clock):
40+
self.__rate_limiting_sampler = _RateLimitingSampler(1, clock)
41+
self.__fixed_rate_sampler = TraceIdRatioBased(0.05)
42+
43+
# pylint: disable=no-self-use
44+
def should_sample(
45+
self,
46+
parent_context: Optional["Context"],
47+
trace_id: int,
48+
name: str,
49+
kind: Optional[SpanKind] = None,
50+
attributes: Optional[Attributes] = None,
51+
links: Optional[Sequence["Link"]] = None,
52+
trace_state: Optional["TraceState"] = None,
53+
) -> "SamplingResult":
54+
sampling_result = self.__rate_limiting_sampler.should_sample(
55+
parent_context,
56+
trace_id,
57+
name,
58+
kind=kind,
59+
attributes=attributes,
60+
links=links,
61+
trace_state=trace_state,
62+
)
63+
if sampling_result.decision is not Decision.DROP:
64+
return sampling_result
65+
return self.__fixed_rate_sampler.should_sample(
66+
parent_context,
67+
trace_id,
68+
name,
69+
kind=kind,
70+
attributes=attributes,
71+
links=links,
72+
trace_state=trace_state,
73+
)
74+
75+
# pylint: disable=no-self-use
76+
def get_description(self) -> str:
77+
description = "FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}"
78+
return description

0 commit comments

Comments
 (0)