diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py new file mode 100644 index 000000000..847f50fb1 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py @@ -0,0 +1,23 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import sys +from logging import Logger, getLogger + +import pkg_resources + +_logger: Logger = getLogger(__name__) + + +def is_installed(req: str) -> bool: + """Is the given required package installed?""" + + if req in sys.modules and sys.modules[req] is not None: + return True + + try: + pkg_resources.get_distribution(req) + except Exception as exc: # pylint: disable=broad-except + _logger.debug("Skipping instrumentation patch: package %s, exception: %s", req, exc) + return False + return True diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index c9d3680a5..d5cea0989 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import os +import re from logging import Logger, getLogger from typing import ClassVar, Dict, List, Type, Union @@ -19,6 +20,7 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder +from amazon.opentelemetry.distro.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader @@ -81,6 +83,7 @@ OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED" SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics" OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" +XRAY_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$" # UDP package size is not larger than 64KB LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10 @@ -315,6 +318,11 @@ def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> Span traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000") span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint) + if isinstance(span_exporter, OTLPSpanExporter) and is_xray_otlp_endpoint( + os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) + ): + span_exporter = OTLPAwsSpanExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)) + if not _is_application_signals_enabled(): return span_exporter @@ -328,6 +336,10 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) -> # Construct and set local and remote attributes span processor provider.add_span_processor(AttributePropagatingSpanProcessorBuilder().build()) + # Do not export Application-Signals metrics if it's XRay OTLP endpoint + if is_xray_otlp_endpoint(): + return + # Export 100% spans and not export Application-Signals metrics if on Lambda. if _is_lambda_environment(): _export_unsampled_span_for_lambda(provider, resource) @@ -437,6 +449,15 @@ def _is_lambda_environment(): return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ +def is_xray_otlp_endpoint(otlp_endpoint: str = None) -> bool: + """Is the given endpoint the XRay OTLP endpoint?""" + + if not otlp_endpoint: + return False + + return bool(re.match(XRAY_OTLP_ENDPOINT_PATTERN, otlp_endpoint.lower())) + + def _get_metric_export_interval(): export_interval_millis = float(os.environ.get(METRIC_EXPORT_INTERVAL_CONFIG, DEFAULT_METRIC_EXPORT_INTERVAL)) _logger.debug("Span Metrics export interval: %s", export_interval_millis) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py new file mode 100644 index 000000000..215b99ded --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py @@ -0,0 +1,96 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import logging +from typing import Dict, Optional + +import requests + +from amazon.opentelemetry.distro._utils import is_installed +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +AWS_SERVICE = "xray" +_logger = logging.getLogger(__name__) + + +class OTLPAwsSpanExporter(OTLPSpanExporter): + """ + This exporter extends the functionality of the OTLPSpanExporter to allow spans to be exported to the + XRay OTLP endpoint https://xray.[AWSRegion].amazonaws.com/v1/traces. Utilizes the botocore + library to sign and directly inject SigV4 Authentication to the exported request's headers. + + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html + """ + + def __init__( + self, + endpoint: Optional[str] = None, + certificate_file: Optional[str] = None, + client_key_file: Optional[str] = None, + client_certificate_file: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, + rsession: Optional[requests.Session] = None, + ): + + self._aws_region = None + + # Requires botocore to be installed to sign the headers. However, + # some users might not need to use this exporter. In order not conflict + # with existing behavior, we check for botocore before initializing this exporter. + + if endpoint and is_installed("botocore"): + # pylint: disable=import-outside-toplevel + from botocore import auth, awsrequest, session + + self.boto_auth = auth + self.boto_aws_request = awsrequest + self.boto_session = session.Session() + + # Assumes only valid endpoints passed are of XRay OTLP format. + # The only usecase for this class would be for ADOT Python Auto Instrumentation and that already validates + # the endpoint to be an XRay OTLP endpoint. + self._aws_region = endpoint.split(".")[1] + + else: + _logger.error( + "botocore is required to export traces to %s. Please install it using `pip install botocore`", + endpoint, + ) + + super().__init__( + endpoint=endpoint, + certificate_file=certificate_file, + client_key_file=client_key_file, + client_certificate_file=client_certificate_file, + headers=headers, + timeout=timeout, + compression=compression, + session=rsession, + ) + + # Overrides upstream's private implementation of _export. All behaviors are + # the same except if the endpoint is an XRay OTLP endpoint, we will sign the request + # with SigV4 in headers before sending it to the endpoint. Otherwise, we will skip signing. + def _export(self, serialized_data: bytes): + request = self.boto_aws_request.AWSRequest( + method="POST", + url=self._endpoint, + data=serialized_data, + headers={"Content-Type": "application/x-protobuf"}, + ) + + credentials = self.boto_session.get_credentials() + + if credentials is not None: + signer = self.boto_auth.SigV4Auth(credentials, AWS_SERVICE, self._aws_region) + + try: + signer.add_auth(request) + self._session.headers.update(dict(request.headers)) + + except Exception as signing_error: # pylint: disable=broad-except + _logger.error("Failed to sign request: %s", signing_error) + + return super()._export(serialized_data) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py index fcda07e64..9a5f4974b 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py @@ -2,11 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import os -import sys from logging import Logger, getLogger -import pkg_resources - +from amazon.opentelemetry.distro._utils import is_installed from amazon.opentelemetry.distro.patches._resource_detector_patches import _apply_resource_detector_patches # Env variable for determining whether we want to monkey patch gevent modules. Possible values are 'all', 'none', and @@ -25,7 +23,7 @@ def apply_instrumentation_patches() -> None: Where possible, automated testing should be run to catch upstream changes resulting in broken patches """ - if _is_installed("gevent"): + if is_installed("gevent"): try: gevent_patch_module = os.environ.get(AWS_GEVENT_PATCH_MODULES, "all") @@ -56,7 +54,7 @@ def apply_instrumentation_patches() -> None: except Exception as exc: # pylint: disable=broad-except _logger.info("Failed to monkey patch gevent, exception: %s", exc) - if _is_installed("botocore ~= 1.0"): + if is_installed("botocore ~= 1.0"): # pylint: disable=import-outside-toplevel # Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed). from amazon.opentelemetry.distro.patches._botocore_patches import _apply_botocore_instrumentation_patches @@ -66,15 +64,3 @@ def apply_instrumentation_patches() -> None: # No need to check if library is installed as this patches opentelemetry.sdk, # which must be installed for the distro to work at all. _apply_resource_detector_patches() - - -def _is_installed(req: str) -> bool: - if req in sys.modules: - return True - - try: - pkg_resources.get_distribution(req) - except Exception as exc: # pylint: disable=broad-except - _logger.debug("Skipping instrumentation patch: package %s, exception: %s", req, exc) - return False - return True diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 209c5d1bd..87e6c4810 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -38,9 +38,7 @@ _LAMBDA_SOURCE_MAPPING_ID: str = "lambdaEventSourceMappingID" # Patch names -GET_DISTRIBUTION_PATCH: str = ( - "amazon.opentelemetry.distro.patches._instrumentation_patch.pkg_resources.get_distribution" -) +GET_DISTRIBUTION_PATCH: str = "amazon.opentelemetry.distro._utils.pkg_resources.get_distribution" class TestInstrumentationPatch(TestCase): diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py new file mode 100644 index 000000000..b0222bb7c --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_aws_span_exporter.py @@ -0,0 +1,211 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import os +from unittest import TestCase +from unittest.mock import ANY, MagicMock, PropertyMock, patch + +import requests +from botocore.credentials import Credentials + +from amazon.opentelemetry.distro.aws_opentelemetry_configurator import OTLPAwsSpanExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + DEFAULT_COMPRESSION, + DEFAULT_ENDPOINT, + DEFAULT_TIMEOUT, + DEFAULT_TRACES_EXPORT_PATH, + OTLPSpanExporter, +) +from opentelemetry.exporter.otlp.proto.http.version import __version__ +from opentelemetry.sdk.environment_variables import OTEL_EXPORTER_OTLP_TRACES_ENDPOINT +from opentelemetry.sdk.trace import SpanContext, _Span +from opentelemetry.trace import SpanKind, TraceFlags + +OTLP_XRAY_ENDPOINT = "https://xray.us-east-1.amazonaws.com/v1/traces" +USER_AGENT = "OTel-OTLP-Exporter-Python/" + __version__ +CONTENT_TYPE = "application/x-protobuf" +AUTHORIZATION_HEADER = "Authorization" +X_AMZ_DATE_HEADER = "X-Amz-Date" +X_AMZ_SECURITY_TOKEN_HEADER = "X-Amz-Security-Token" + + +class TestAwsSpanExporter(TestCase): + def setUp(self): + self.testing_spans = [ + self.create_span("test_span1", SpanKind.INTERNAL), + self.create_span("test_span2", SpanKind.SERVER), + self.create_span("test_span3", SpanKind.CLIENT), + self.create_span("test_span4", SpanKind.PRODUCER), + self.create_span("test_span5", SpanKind.CONSUMER), + ] + + self.expected_auth_header = "AWS4-HMAC-SHA256 Credential=test_key/some_date/us-east-1/xray/aws4_request" + self.expected_auth_x_amz_date = "some_date" + self.expected_auth_security_token = "test_token" + + @patch.dict(os.environ, {}, clear=True) + def test_sigv4_exporter_init_default(self): + """Tests that the default exporter is OTLP protobuf/http Span Exporter if no endpoint is set""" + + exporter = OTLPAwsSpanExporter() + self.validate_exporter_extends_http_span_exporter(exporter, DEFAULT_ENDPOINT + DEFAULT_TRACES_EXPORT_PATH) + self.assertIsNone(exporter._aws_region) + self.assertIsInstance(exporter._session, requests.Session) + + @patch.dict("sys.modules", {"botocore": None}, clear=False) + @patch("pkg_resources.get_distribution") + def test_no_botocore_valid_xray_endpoint(self, mock_get_distribution): + """Test that exporter defaults when using OTLP CW endpoint without botocore""" + + def throw_exception(): + raise ImportError("test error") + + mock_get_distribution.side_effect = throw_exception + + exporter = OTLPAwsSpanExporter(endpoint=OTLP_XRAY_ENDPOINT) + self.validate_exporter_extends_http_span_exporter(exporter, OTLP_XRAY_ENDPOINT) + self.assertIsNone(exporter._aws_region) + + @patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: OTLP_XRAY_ENDPOINT}, clear=True) + @patch("botocore.session.Session") + def test_sigv4_exporter_init_valid_cw_otlp_endpoint(self, session_mock): + """Tests that the endpoint is validated and sets the aws_region but still uses the OTLP protobuf/http + Span Exporter exporter constructor behavior if a valid OTLP CloudWatch endpoint is set.""" + + mock_session = MagicMock() + session_mock.return_value = mock_session + + exporter = OTLPAwsSpanExporter(endpoint=OTLP_XRAY_ENDPOINT) + + self.assertEqual(exporter._aws_region, "us-east-1") + self.validate_exporter_extends_http_span_exporter(exporter, OTLP_XRAY_ENDPOINT) + + @patch("botocore.session.Session") + @patch("requests.Session") + @patch("botocore.auth.SigV4Auth.add_auth") + @patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: OTLP_XRAY_ENDPOINT}) + def test_sigv4_exporter_export_does_not_add_sigv4_if_not_valid_credentials( + self, mock_sigv4_auth, requests_posts_mock, botocore_mock + ): + """Tests that if the OTLP endpoint is a valid CW endpoint but no credentials are returned, + SigV4 authentication method is NOT called and is NOT injected into the existing + Session headers.""" + # Setting the exporter response + mock_response = MagicMock() + mock_response.status_code = 200 + type(mock_response).ok = PropertyMock(return_value=True) + + # Setting the request session headers to make the call to endpoint + mock_session = MagicMock() + mock_session.headers = {"User-Agent": USER_AGENT, "Content-Type": CONTENT_TYPE} + requests_posts_mock.return_value = mock_session + mock_session.post.return_value = mock_response + + mock_botocore_session = MagicMock() + botocore_mock.return_value = mock_botocore_session + + # Test case, return None for get credentials + mock_botocore_session.get_credentials.return_value = None + + # Initialize and call exporter + exporter = OTLPAwsSpanExporter(endpoint=OTLP_XRAY_ENDPOINT) + + # Validate that the region is valid + self.assertEqual(exporter._aws_region, "us-east-1") + + exporter.export(self.testing_spans) + + # Verify SigV4 auth was not called + mock_sigv4_auth.assert_not_called() + + # Verify that SigV4 request headers were properly injected + actual_headers = mock_session.headers + self.assertNotIn(AUTHORIZATION_HEADER, actual_headers) + self.assertNotIn(X_AMZ_DATE_HEADER, actual_headers) + self.assertNotIn(X_AMZ_SECURITY_TOKEN_HEADER, actual_headers) + + @patch("botocore.session.Session") + @patch("requests.Session") + @patch("botocore.auth.SigV4Auth.add_auth") + @patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: OTLP_XRAY_ENDPOINT}) + def test_sigv4_exporter_export_adds_sigv4_authentication_if_valid_cw_endpoint( + self, mock_sigv4_auth, requests_posts_mock, botocore_mock + ): + """Tests that if the OTLP endpoint is valid and credentials are valid, + SigV4 authentication method is called and is + injected into the existing Session headers.""" + + # Setting the exporter response + mock_response = MagicMock() + mock_response.status_code = 200 + type(mock_response).ok = PropertyMock(return_value=True) + + # Setting the request session headers to make the call to endpoint + mock_session = MagicMock() + mock_session.headers = {"User-Agent": USER_AGENT, "Content-Type": CONTENT_TYPE} + requests_posts_mock.return_value = mock_session + mock_session.post.return_value = mock_response + + mock_botocore_session = MagicMock() + botocore_mock.return_value = mock_botocore_session + mock_botocore_session.get_credentials.return_value = Credentials( + access_key="test_key", secret_key="test_secret", token="test_token" + ) + + # SigV4 mock authentication injection + mock_sigv4_auth.side_effect = self.mock_add_auth + + # Initialize and call exporter + exporter = OTLPAwsSpanExporter(endpoint=OTLP_XRAY_ENDPOINT) + exporter.export(self.testing_spans) + + # Verify SigV4 auth was called + mock_sigv4_auth.assert_called_once_with(ANY) + + # Verify that SigV4 request headers were properly injected + actual_headers = mock_session.headers + self.assertIn("Authorization", actual_headers) + self.assertIn("X-Amz-Date", actual_headers) + self.assertIn("X-Amz-Security-Token", actual_headers) + + self.assertEqual(actual_headers[AUTHORIZATION_HEADER], self.expected_auth_header) + self.assertEqual(actual_headers[X_AMZ_DATE_HEADER], self.expected_auth_x_amz_date) + self.assertEqual(actual_headers[X_AMZ_SECURITY_TOKEN_HEADER], self.expected_auth_security_token) + + def validate_exporter_extends_http_span_exporter(self, exporter, endpoint): + self.assertIsInstance(exporter, OTLPSpanExporter) + self.assertEqual(exporter._endpoint, endpoint) + self.assertEqual(exporter._certificate_file, True) + self.assertEqual(exporter._client_certificate_file, None) + self.assertEqual(exporter._client_key_file, None) + self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) + self.assertIs(exporter._compression, DEFAULT_COMPRESSION) + self.assertEqual(exporter._headers, {}) + self.assertIn("User-Agent", exporter._session.headers) + self.assertEqual( + exporter._session.headers.get("Content-Type"), + CONTENT_TYPE, + ) + self.assertEqual(exporter._session.headers.get("User-Agent"), USER_AGENT) + + @staticmethod + def create_span(name="test_span", kind=SpanKind.INTERNAL): + span = _Span( + name=name, + context=SpanContext( + trace_id=0x1234567890ABCDEF, + span_id=0x9876543210, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ), + kind=kind, + ) + return span + + def mock_add_auth(self, request): + request.headers._headers.extend( + [ + (AUTHORIZATION_HEADER, self.expected_auth_header), + (X_AMZ_DATE_HEADER, self.expected_auth_x_amz_date), + (X_AMZ_SECURITY_TOKEN_HEADER, self.expected_auth_security_token), + ] + )