44import os
55import re
66from logging import NOTSET , Logger , getLogger
7- from typing import ClassVar , Dict , List , Type , Union
7+ from typing import ClassVar , Dict , List , NamedTuple , Optional , Type , Union
88
99from importlib_metadata import version
1010from typing_extensions import override
1111
1212from amazon .opentelemetry .distro ._aws_attribute_keys import AWS_LOCAL_SERVICE
1313from amazon .opentelemetry .distro ._aws_resource_attribute_configurator import get_service_attribute
14- from amazon .opentelemetry .distro ._utils import is_agent_observability_enabled
14+ from amazon .opentelemetry .distro ._utils import is_agent_observability_enabled , is_installed
1515from amazon .opentelemetry .distro .always_record_sampler import AlwaysRecordSampler
1616from amazon .opentelemetry .distro .attribute_propagating_span_processor_builder import (
1717 AttributePropagatingSpanProcessorBuilder ,
9898
9999AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
100100AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
101+ AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"
101102
102103# UDP package size is not larger than 64KB
103104LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
113114_logger : Logger = getLogger (__name__ )
114115
115116
117+ class OtlpLogHeaderSetting (NamedTuple ):
118+ log_group : Optional [str ]
119+ log_stream : Optional [str ]
120+ namespace : Optional [str ]
121+ is_valid : bool
122+
123+
116124class AwsOpenTelemetryConfigurator (_OTelSDKConfigurator ):
117125 """
118126 This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change:
@@ -141,6 +149,11 @@ def _configure(self, **kwargs):
141149# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
142150# internal logic.
143151def _initialize_components ():
152+ # Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
153+ # from _import_exporters in OTel dependencies which would try to load exporters
154+ # We will contribute emf exporter to upstream for supporting OTel metrics in SDK
155+ is_emf_enabled = _check_emf_exporter_enabled ()
156+
144157 trace_exporters , metric_exporters , log_exporters = _import_exporters (
145158 _get_exporter_names ("traces" ),
146159 _get_exporter_names ("metrics" ),
@@ -176,7 +189,8 @@ def _initialize_components():
176189 sampler = sampler ,
177190 resource = resource ,
178191 )
179- _init_metrics (metric_exporters , resource )
192+
193+ _init_metrics (metric_exporters , resource , is_emf_enabled )
180194 logging_enabled = os .getenv (_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED , "false" )
181195 if logging_enabled .strip ().lower () == "true" :
182196 _init_logging (log_exporters , resource )
@@ -235,6 +249,7 @@ def _init_tracing(
235249def _init_metrics (
236250 exporters_or_readers : Dict [str , Union [Type [MetricExporter ], Type [MetricReader ]]],
237251 resource : Resource = None ,
252+ is_emf_enabled : bool = False ,
238253):
239254 metric_readers = []
240255 views = []
@@ -247,7 +262,7 @@ def _init_metrics(
247262 else :
248263 metric_readers .append (PeriodicExportingMetricReader (exporter_or_reader_class (** exporter_args )))
249264
250- _customize_metric_exporters (metric_readers , views )
265+ _customize_metric_exporters (metric_readers , views , is_emf_enabled )
251266
252267 provider = MeterProvider (resource = resource , metric_readers = metric_readers , views = views )
253268 set_meter_provider (provider )
@@ -397,7 +412,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
397412 if _is_aws_otlp_endpoint (logs_endpoint , "logs" ):
398413 _logger .info ("Detected using AWS OTLP Logs Endpoint." )
399414
400- if isinstance (log_exporter , OTLPLogExporter ) and _validate_logs_headers () :
415+ if isinstance (log_exporter , OTLPLogExporter ) and _validate_and_fetch_logs_header (). is_valid :
401416 # Setting default compression mode to Gzip as this is the behavior in upstream's
402417 # collector otlp http exporter:
403418 # https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
@@ -450,7 +465,9 @@ def session_id_predicate(baggage_key: str) -> bool:
450465 return
451466
452467
453- def _customize_metric_exporters (metric_readers : List [MetricReader ], views : List [View ]) -> None :
468+ def _customize_metric_exporters (
469+ metric_readers : List [MetricReader ], views : List [View ], is_emf_enabled : bool = False
470+ ) -> None :
454471 if _is_application_signals_runtime_enabled ():
455472 _get_runtime_metric_views (views , 0 == len (metric_readers ))
456473
@@ -462,6 +479,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
462479 )
463480 metric_readers .append (scope_based_periodic_exporting_metric_reader )
464481
482+ if is_emf_enabled :
483+ emf_exporter = create_emf_exporter ()
484+ if emf_exporter :
485+ metric_readers .append (PeriodicExportingMetricReader (emf_exporter ))
486+
465487
466488def _get_runtime_metric_views (views : List [View ], retain_runtime_only : bool ) -> None :
467489 runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
@@ -551,7 +573,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
551573 return bool (re .match (pattern , otlp_endpoint .lower ()))
552574
553575
554- def _validate_logs_headers () -> bool :
576+ def _validate_and_fetch_logs_header () -> OtlpLogHeaderSetting :
555577 """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
556578 AWS OTLP Logs endpoint."""
557579
@@ -562,26 +584,36 @@ def _validate_logs_headers() -> bool:
562584 "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
563585 "to include x-aws-log-group and x-aws-log-stream"
564586 )
565- return False
587+ return OtlpLogHeaderSetting ( None , None , None , False )
566588
589+ log_group = None
590+ log_stream = None
591+ namespace = None
567592 filtered_log_headers_count = 0
568593
569594 for pair in logs_headers .split ("," ):
570595 if "=" in pair :
571596 split = pair .split ("=" , 1 )
572597 key = split [0 ]
573598 value = split [1 ]
574- if key in (AWS_OTLP_LOGS_GROUP_HEADER , AWS_OTLP_LOGS_STREAM_HEADER ) and value :
599+ if key == AWS_OTLP_LOGS_GROUP_HEADER and value :
600+ log_group = value
601+ filtered_log_headers_count += 1
602+ elif key == AWS_OTLP_LOGS_STREAM_HEADER and value :
603+ log_stream = value
575604 filtered_log_headers_count += 1
605+ elif key == AWS_EMF_METRICS_NAMESPACE and value :
606+ namespace = value
576607
577- if filtered_log_headers_count != 2 :
608+ is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None
609+
610+ if not is_valid :
578611 _logger .warning (
579612 "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
580613 "to have values for x-aws-log-group and x-aws-log-stream"
581614 )
582- return False
583615
584- return True
616+ return OtlpLogHeaderSetting ( log_group , log_stream , namespace , is_valid )
585617
586618
587619def _get_metric_export_interval ():
@@ -652,3 +684,73 @@ def create_exporter(self):
652684 )
653685
654686 raise RuntimeError (f"Unsupported AWS Application Signals export protocol: { protocol } " )
687+
688+
689+ def _check_emf_exporter_enabled () -> bool :
690+ """
691+ Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
692+ and updates the environment variable.
693+
694+ Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
695+ from _import_exporters in OTel dependencies which would try to load exporters
696+ We will contribute emf exporter to upstream for supporting OTel metrics in SDK
697+
698+ Returns:
699+ bool: True if "awsemf" was found and removed, False otherwise.
700+ """
701+ # Get the current exporter value
702+ exporter_value = os .environ .get ("OTEL_METRICS_EXPORTER" , "" )
703+
704+ # Check if it's empty
705+ if not exporter_value :
706+ return False
707+
708+ # Split by comma and convert to list
709+ exporters = [exp .strip () for exp in exporter_value .split ("," )]
710+
711+ # Check if awsemf is in the list
712+ if "awsemf" not in exporters :
713+ return False
714+
715+ # Remove awsemf from the list
716+ exporters .remove ("awsemf" )
717+
718+ # Join the remaining exporters and update the environment variable
719+ new_value = "," .join (exporters ) if exporters else ""
720+
721+ # Set the new value (or unset if empty)
722+ if new_value :
723+ os .environ ["OTEL_METRICS_EXPORTER" ] = new_value
724+ elif "OTEL_METRICS_EXPORTER" in os .environ :
725+ del os .environ ["OTEL_METRICS_EXPORTER" ]
726+
727+ return True
728+
729+
730+ def create_emf_exporter ():
731+ """Create and configure the CloudWatch EMF exporter."""
732+ try :
733+ # Check if botocore is available before importing the EMF exporter
734+ if not is_installed ("botocore" ):
735+ _logger .warning ("botocore is not installed. EMF exporter requires botocore" )
736+ return None
737+
738+ # pylint: disable=import-outside-toplevel
739+ from amazon .opentelemetry .distro .exporter .aws .metrics .aws_cloudwatch_emf_exporter import (
740+ AwsCloudWatchEmfExporter ,
741+ )
742+
743+ log_header_setting = _validate_and_fetch_logs_header ()
744+
745+ if not log_header_setting .is_valid :
746+ return None
747+
748+ return AwsCloudWatchEmfExporter (
749+ namespace = log_header_setting .namespace ,
750+ log_group_name = log_header_setting .log_group ,
751+ log_stream_name = log_header_setting .log_stream ,
752+ )
753+ # pylint: disable=broad-exception-caught
754+ except Exception as errors :
755+ _logger .error ("Failed to create EMF exporter: %s" , errors )
756+ return None
0 commit comments