From bfb8956ea68c73a5893286aa1f2060501ba05017 Mon Sep 17 00:00:00 2001 From: "mxiamxia@gmail.com" Date: Wed, 25 Jun 2025 16:48:58 -0700 Subject: [PATCH] Add CloudWatch EMF exporter integration to AWS OpenTelemetry configurator --- .../distro/aws_opentelemetry_configurator.py | 126 ++++++++++-- .../test_aws_opentelementry_configurator.py | 179 ++++++++++++++++++ 2 files changed, 293 insertions(+), 12 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index af90d15a7..fcb9a7e50 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -4,14 +4,14 @@ import os import re from logging import NOTSET, Logger, getLogger -from typing import ClassVar, Dict, List, Type, Union +from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union from importlib_metadata import version from typing_extensions import override from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute -from amazon.opentelemetry.distro._utils import is_agent_observability_enabled +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( AttributePropagatingSpanProcessorBuilder, @@ -98,6 +98,7 @@ AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group" AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream" +AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace" # UDP package size is not larger than 64KB LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10 @@ -113,6 +114,13 @@ _logger: Logger = getLogger(__name__) +class OtlpLogHeaderSetting(NamedTuple): + log_group: Optional[str] + log_stream: Optional[str] + namespace: Optional[str] + is_valid: bool + + class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): """ This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change: @@ -141,6 +149,11 @@ def _configure(self, **kwargs): # Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on # internal logic. def _initialize_components(): + # Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors + # from _import_exporters in OTel dependencies which would try to load exporters + # We will contribute emf exporter to upstream for supporting OTel metrics in SDK + is_emf_enabled = _check_emf_exporter_enabled() + trace_exporters, metric_exporters, log_exporters = _import_exporters( _get_exporter_names("traces"), _get_exporter_names("metrics"), @@ -176,7 +189,8 @@ def _initialize_components(): sampler=sampler, resource=resource, ) - _init_metrics(metric_exporters, resource) + + _init_metrics(metric_exporters, resource, is_emf_enabled) logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") if logging_enabled.strip().lower() == "true": _init_logging(log_exporters, resource) @@ -235,6 +249,7 @@ def _init_tracing( def _init_metrics( exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]], resource: Resource = None, + is_emf_enabled: bool = False, ): metric_readers = [] views = [] @@ -247,7 +262,7 @@ def _init_metrics( else: metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args))) - _customize_metric_exporters(metric_readers, views) + _customize_metric_exporters(metric_readers, views, is_emf_enabled) provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views) set_meter_provider(provider) @@ -397,7 +412,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L if _is_aws_otlp_endpoint(logs_endpoint, "logs"): _logger.info("Detected using AWS OTLP Logs Endpoint.") - if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers(): + if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid: # Setting default compression mode to Gzip as this is the behavior in upstream's # collector otlp http exporter: # https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter @@ -450,7 +465,9 @@ def session_id_predicate(baggage_key: str) -> bool: return -def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None: +def _customize_metric_exporters( + metric_readers: List[MetricReader], views: List[View], is_emf_enabled: bool = False +) -> None: if _is_application_signals_runtime_enabled(): _get_runtime_metric_views(views, 0 == len(metric_readers)) @@ -462,6 +479,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[ ) metric_readers.append(scope_based_periodic_exporting_metric_reader) + if is_emf_enabled: + emf_exporter = create_emf_exporter() + if emf_exporter: + metric_readers.append(PeriodicExportingMetricReader(emf_exporter)) + def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None: runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME @@ -551,7 +573,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b return bool(re.match(pattern, otlp_endpoint.lower())) -def _validate_logs_headers() -> bool: +def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to AWS OTLP Logs endpoint.""" @@ -562,8 +584,11 @@ def _validate_logs_headers() -> bool: "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " "to include x-aws-log-group and x-aws-log-stream" ) - return False + return OtlpLogHeaderSetting(None, None, None, False) + log_group = None + log_stream = None + namespace = None filtered_log_headers_count = 0 for pair in logs_headers.split(","): @@ -571,17 +596,24 @@ def _validate_logs_headers() -> bool: split = pair.split("=", 1) key = split[0] value = split[1] - if key in (AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER) and value: + if key == AWS_OTLP_LOGS_GROUP_HEADER and value: + log_group = value + filtered_log_headers_count += 1 + elif key == AWS_OTLP_LOGS_STREAM_HEADER and value: + log_stream = value filtered_log_headers_count += 1 + elif key == AWS_EMF_METRICS_NAMESPACE and value: + namespace = value - if filtered_log_headers_count != 2: + is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None + + if not is_valid: _logger.warning( "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " "to have values for x-aws-log-group and x-aws-log-stream" ) - return False - return True + return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid) def _get_metric_export_interval(): @@ -652,3 +684,73 @@ def create_exporter(self): ) raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ") + + +def _check_emf_exporter_enabled() -> bool: + """ + Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present, + and updates the environment variable. + + Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors + from _import_exporters in OTel dependencies which would try to load exporters + We will contribute emf exporter to upstream for supporting OTel metrics in SDK + + Returns: + bool: True if "awsemf" was found and removed, False otherwise. + """ + # Get the current exporter value + exporter_value = os.environ.get("OTEL_METRICS_EXPORTER", "") + + # Check if it's empty + if not exporter_value: + return False + + # Split by comma and convert to list + exporters = [exp.strip() for exp in exporter_value.split(",")] + + # Check if awsemf is in the list + if "awsemf" not in exporters: + return False + + # Remove awsemf from the list + exporters.remove("awsemf") + + # Join the remaining exporters and update the environment variable + new_value = ",".join(exporters) if exporters else "" + + # Set the new value (or unset if empty) + if new_value: + os.environ["OTEL_METRICS_EXPORTER"] = new_value + elif "OTEL_METRICS_EXPORTER" in os.environ: + del os.environ["OTEL_METRICS_EXPORTER"] + + return True + + +def create_emf_exporter(): + """Create and configure the CloudWatch EMF exporter.""" + try: + # Check if botocore is available before importing the EMF exporter + if not is_installed("botocore"): + _logger.warning("botocore is not installed. EMF exporter requires botocore") + return None + + # pylint: disable=import-outside-toplevel + from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import ( + AwsCloudWatchEmfExporter, + ) + + log_header_setting = _validate_and_fetch_logs_header() + + if not log_header_setting.is_valid: + return None + + return AwsCloudWatchEmfExporter( + namespace=log_header_setting.namespace, + log_group_name=log_header_setting.log_group, + log_stream_name=log_header_setting.log_stream, + ) + # pylint: disable=broad-exception-caught + except Exception as errors: + _logger.error("Failed to create EMF exporter: %s", errors) + return None diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index dbaee3c33..7fe02d19f 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -1,5 +1,8 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=too-many-lines + import os import time from unittest import TestCase @@ -19,6 +22,8 @@ OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, ApplicationSignalsExporterProvider, AwsOpenTelemetryConfigurator, + OtlpLogHeaderSetting, + _check_emf_exporter_enabled, _custom_import_sampler, _customize_logs_exporter, _customize_metric_exporters, @@ -31,9 +36,12 @@ _is_application_signals_runtime_enabled, _is_defer_to_workers_enabled, _is_wsgi_master_process, + _validate_and_fetch_logs_header, + create_emf_exporter, ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor +from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter @@ -845,6 +853,177 @@ def customize_exporter_test( for key in config.keys(): os.environ.pop(key, None) + def test_check_emf_exporter_enabled(self): + # Test when OTEL_METRICS_EXPORTER is not set + os.environ.pop("OTEL_METRICS_EXPORTER", None) + self.assertFalse(_check_emf_exporter_enabled()) + + # Test when OTEL_METRICS_EXPORTER is empty + os.environ["OTEL_METRICS_EXPORTER"] = "" + self.assertFalse(_check_emf_exporter_enabled()) + + # Test when awsemf is not in the list + os.environ["OTEL_METRICS_EXPORTER"] = "console,otlp" + self.assertFalse(_check_emf_exporter_enabled()) + + # Test when awsemf is in the list + os.environ["OTEL_METRICS_EXPORTER"] = "console,awsemf,otlp" + self.assertTrue(_check_emf_exporter_enabled()) + # Should remove awsemf from the list + self.assertEqual(os.environ["OTEL_METRICS_EXPORTER"], "console,otlp") + + # Test when awsemf is the only exporter + os.environ["OTEL_METRICS_EXPORTER"] = "awsemf" + self.assertTrue(_check_emf_exporter_enabled()) + # Should remove the environment variable entirely + self.assertNotIn("OTEL_METRICS_EXPORTER", os.environ) + + # Test with spaces in the list + os.environ["OTEL_METRICS_EXPORTER"] = " console , awsemf , otlp " + self.assertTrue(_check_emf_exporter_enabled()) + self.assertEqual(os.environ["OTEL_METRICS_EXPORTER"], "console,otlp") + + # Clean up + os.environ.pop("OTEL_METRICS_EXPORTER", None) + + def test_validate_and_fetch_logs_header(self): + # Test when headers are not set + os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) + result = _validate_and_fetch_logs_header() + self.assertIsInstance(result, OtlpLogHeaderSetting) + self.assertIsNone(result.log_group) + self.assertIsNone(result.log_stream) + self.assertIsNone(result.namespace) + self.assertFalse(result.is_valid) + + # Test with valid headers + os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=test-stream" + result = _validate_and_fetch_logs_header() + self.assertEqual(result.log_group, "test-group") + self.assertEqual(result.log_stream, "test-stream") + self.assertIsNone(result.namespace) + self.assertTrue(result.is_valid) + + # Test with valid headers including namespace + os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = ( + "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace" + ) + result = _validate_and_fetch_logs_header() + self.assertEqual(result.log_group, "test-group") + self.assertEqual(result.log_stream, "test-stream") + self.assertEqual(result.namespace, "test-namespace") + self.assertTrue(result.is_valid) + + # Test with missing log group + os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-stream=test-stream" + result = _validate_and_fetch_logs_header() + self.assertIsNone(result.log_group) + self.assertEqual(result.log_stream, "test-stream") + self.assertFalse(result.is_valid) + + # Test with missing log stream + os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group" + result = _validate_and_fetch_logs_header() + self.assertEqual(result.log_group, "test-group") + self.assertIsNone(result.log_stream) + self.assertFalse(result.is_valid) + + # Test with empty value in log group + os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=,x-aws-log-stream=test-stream" + result = _validate_and_fetch_logs_header() + self.assertIsNone(result.log_group) + self.assertEqual(result.log_stream, "test-stream") + self.assertFalse(result.is_valid) + + # Test with empty value in log stream + os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=" + result = _validate_and_fetch_logs_header() + self.assertEqual(result.log_group, "test-group") + self.assertIsNone(result.log_stream) + self.assertFalse(result.is_valid) + + # Clean up + os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_installed") + def test_create_emf_exporter(self, mock_is_installed, mock_validate): + # Test when botocore is not installed + mock_is_installed.return_value = False + result = create_emf_exporter() + self.assertIsNone(result) + mock_is_installed.assert_called_with("botocore") + + # Reset mock for subsequent tests + mock_is_installed.reset_mock() + mock_is_installed.return_value = True + + # Mock the EMF exporter class import by patching the module import + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" + ) as mock_emf_exporter_class: + mock_exporter_instance = MagicMock() + mock_exporter_instance.namespace = "default" + mock_exporter_instance.log_group_name = "test-group" + mock_emf_exporter_class.return_value = mock_exporter_instance + + # Test when headers are invalid + mock_validate.return_value = OtlpLogHeaderSetting(None, None, None, False) + result = create_emf_exporter() + self.assertIsNone(result) + + # Test when namespace is missing (should still create exporter with default namespace) + mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", None, True) + result = create_emf_exporter() + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + # Verify that the EMF exporter was called with correct parameters + mock_emf_exporter_class.assert_called_with( + namespace=None, log_group_name="test-group", log_stream_name="test-stream" + ) + + # Test with valid configuration + mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", "test-namespace", True) + result = create_emf_exporter() + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + # Verify that the EMF exporter was called with correct parameters + mock_emf_exporter_class.assert_called_with( + namespace="test-namespace", log_group_name="test-group", log_stream_name="test-stream" + ) + + # Test exception handling + mock_validate.side_effect = Exception("Test exception") + result = create_emf_exporter() + self.assertIsNone(result) + + def test_customize_metric_exporters_with_emf(self): + metric_readers = [] + views = [] + + # Test with EMF disabled + _customize_metric_exporters(metric_readers, views, is_emf_enabled=False) + self.assertEqual(len(metric_readers), 0) + + # Test with EMF enabled but create_emf_exporter returns None + with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.create_emf_exporter", return_value=None): + _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) + self.assertEqual(len(metric_readers), 0) + + # Test with EMF enabled and valid exporter + mock_emf_exporter = MagicMock(spec=AwsCloudWatchEmfExporter) + # Add the required attributes that PeriodicExportingMetricReader expects + mock_emf_exporter._preferred_temporality = {} + mock_emf_exporter._preferred_aggregation = {} + + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.create_emf_exporter", + return_value=mock_emf_exporter, + ): + _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) + self.assertEqual(len(metric_readers), 1) + self.assertIsInstance(metric_readers[0], PeriodicExportingMetricReader) + def validate_distro_environ(): tc: TestCase = TestCase()