44import os
55import re
66from logging import NOTSET , Logger , getLogger
7- from typing import ClassVar , Dict , List , Type , Union
7+ from typing import ClassVar , Dict , List , NamedTuple , Optional , Type , Union
88
99from importlib_metadata import version
1010from typing_extensions import override
1111
12- from amazon .opentelemetry .distro ._aws_attribute_keys import AWS_LOCAL_SERVICE
12+ from amazon .opentelemetry .distro ._aws_attribute_keys import AWS_LOCAL_SERVICE , AWS_SERVICE_TYPE
1313from amazon .opentelemetry .distro ._aws_resource_attribute_configurator import get_service_attribute
14- from amazon .opentelemetry .distro ._utils import is_agent_observability_enabled
14+ from amazon .opentelemetry .distro ._utils import is_agent_observability_enabled , is_installed
1515from amazon .opentelemetry .distro .always_record_sampler import AlwaysRecordSampler
1616from amazon .opentelemetry .distro .attribute_propagating_span_processor_builder import (
1717 AttributePropagatingSpanProcessorBuilder ,
9898
9999AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
100100AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
101+ AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"
101102
102103# UDP package size is not larger than 64KB
103104LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
104105
106+ OTEL_TRACES_EXPORTER = "OTEL_TRACES_EXPORTER"
107+ OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER"
108+ OTEL_METRICS_EXPORTER = "OTEL_METRICS_EXPORTER"
109+ OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
110+ OTEL_TRACES_SAMPLER = "OTEL_TRACES_SAMPLER"
111+ OTEL_PYTHON_DISABLED_INSTRUMENTATIONS = "OTEL_PYTHON_DISABLED_INSTRUMENTATIONS"
112+ OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED = "OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED"
113+
105114_logger : Logger = getLogger (__name__ )
106115
107116
117+ class OtlpLogHeaderSetting (NamedTuple ):
118+ log_group : Optional [str ]
119+ log_stream : Optional [str ]
120+ namespace : Optional [str ]
121+ is_valid : bool
122+
123+
108124class AwsOpenTelemetryConfigurator (_OTelSDKConfigurator ):
109125 """
110126 This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change:
@@ -133,6 +149,11 @@ def _configure(self, **kwargs):
133149# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
134150# internal logic.
135151def _initialize_components ():
152+ # Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
153+ # from _import_exporters in OTel dependencies which would try to load exporters
154+ # We will contribute emf exporter to upstream for supporting OTel metrics in SDK
155+ is_emf_enabled = _check_emf_exporter_enabled ()
156+
136157 trace_exporters , metric_exporters , log_exporters = _import_exporters (
137158 _get_exporter_names ("traces" ),
138159 _get_exporter_names ("metrics" ),
@@ -153,7 +174,7 @@ def _initialize_components():
153174 AwsEksResourceDetector (),
154175 AwsEcsResourceDetector (),
155176 ]
156- if not _is_lambda_environment ()
177+ if not ( _is_lambda_environment () or is_agent_observability_enabled () )
157178 else []
158179 )
159180
@@ -168,7 +189,8 @@ def _initialize_components():
168189 sampler = sampler ,
169190 resource = resource ,
170191 )
171- _init_metrics (metric_exporters , resource )
192+
193+ _init_metrics (metric_exporters , resource , is_emf_enabled )
172194 logging_enabled = os .getenv (_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED , "false" )
173195 if logging_enabled .strip ().lower () == "true" :
174196 _init_logging (log_exporters , resource )
@@ -227,6 +249,7 @@ def _init_tracing(
227249def _init_metrics (
228250 exporters_or_readers : Dict [str , Union [Type [MetricExporter ], Type [MetricReader ]]],
229251 resource : Resource = None ,
252+ is_emf_enabled : bool = False ,
230253):
231254 metric_readers = []
232255 views = []
@@ -239,7 +262,7 @@ def _init_metrics(
239262 else :
240263 metric_readers .append (PeriodicExportingMetricReader (exporter_or_reader_class (** exporter_args )))
241264
242- _customize_metric_exporters (metric_readers , views )
265+ _customize_metric_exporters (metric_readers , views , is_emf_enabled )
243266
244267 provider = MeterProvider (resource = resource , metric_readers = metric_readers , views = views )
245268 set_meter_provider (provider )
@@ -265,6 +288,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource:
265288 )
266289
267290
291+ def _export_unsampled_span_for_agent_observability (trace_provider : TracerProvider , resource : Resource = None ):
292+ if not is_agent_observability_enabled ():
293+ return
294+
295+ traces_endpoint = os .environ .get (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT )
296+
297+ span_exporter = OTLPAwsSpanExporter (endpoint = traces_endpoint , logger_provider = get_logger_provider ())
298+
299+ trace_provider .add_span_processor (BatchUnsampledSpanProcessor (span_exporter = span_exporter ))
300+
301+
268302def _is_defer_to_workers_enabled ():
269303 return os .environ .get (OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG , "false" ).strip ().lower () == "true"
270304
@@ -389,7 +423,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
389423 if _is_aws_otlp_endpoint (logs_endpoint , "logs" ):
390424 _logger .info ("Detected using AWS OTLP Logs Endpoint." )
391425
392- if isinstance (log_exporter , OTLPLogExporter ) and _validate_logs_headers () :
426+ if isinstance (log_exporter , OTLPLogExporter ) and _validate_and_fetch_logs_header (). is_valid :
393427 # Setting default compression mode to Gzip as this is the behavior in upstream's
394428 # collector otlp http exporter:
395429 # https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
@@ -408,9 +442,14 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
408442 if _is_lambda_environment ():
409443 provider .add_span_processor (AwsLambdaSpanProcessor ())
410444
445+ # We always send 100% spans to Genesis platform for agent observability because
446+ # AI applications typically have low throughput traffic patterns and require
447+ # comprehensive monitoring to catch subtle failure modes like hallucinations
448+ # and quality degradation that sampling could miss.
411449 # Add session.id baggage attribute to span attributes to support AI Agent use cases
412450 # enabling session ID tracking in spans.
413451 if is_agent_observability_enabled ():
452+ _export_unsampled_span_for_agent_observability (provider , resource )
414453
415454 def session_id_predicate (baggage_key : str ) -> bool :
416455 return baggage_key == "session.id"
@@ -442,7 +481,9 @@ def session_id_predicate(baggage_key: str) -> bool:
442481 return
443482
444483
445- def _customize_metric_exporters (metric_readers : List [MetricReader ], views : List [View ]) -> None :
484+ def _customize_metric_exporters (
485+ metric_readers : List [MetricReader ], views : List [View ], is_emf_enabled : bool = False
486+ ) -> None :
446487 if _is_application_signals_runtime_enabled ():
447488 _get_runtime_metric_views (views , 0 == len (metric_readers ))
448489
@@ -454,6 +495,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
454495 )
455496 metric_readers .append (scope_based_periodic_exporting_metric_reader )
456497
498+ if is_emf_enabled :
499+ emf_exporter = create_emf_exporter ()
500+ if emf_exporter :
501+ metric_readers .append (PeriodicExportingMetricReader (emf_exporter ))
502+
457503
458504def _get_runtime_metric_views (views : List [View ], retain_runtime_only : bool ) -> None :
459505 runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
@@ -509,7 +555,15 @@ def _customize_resource(resource: Resource) -> Resource:
509555 if is_unknown :
510556 _logger .debug ("No valid service name found" )
511557
512- return resource .merge (Resource .create ({AWS_LOCAL_SERVICE : service_name }))
558+ custom_attributes = {AWS_LOCAL_SERVICE : service_name }
559+
560+ if is_agent_observability_enabled ():
561+ # Add aws.service.type if it doesn't exist in the resource
562+ if resource and resource .attributes .get (AWS_SERVICE_TYPE ) is None :
563+ # Set a default agent type for AI agent observability
564+ custom_attributes [AWS_SERVICE_TYPE ] = "gen_ai_agent"
565+
566+ return resource .merge (Resource .create (custom_attributes ))
513567
514568
515569def _is_application_signals_enabled ():
@@ -543,7 +597,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
543597 return bool (re .match (pattern , otlp_endpoint .lower ()))
544598
545599
546- def _validate_logs_headers () -> bool :
600+ def _validate_and_fetch_logs_header () -> OtlpLogHeaderSetting :
547601 """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
548602 AWS OTLP Logs endpoint."""
549603
@@ -554,26 +608,36 @@ def _validate_logs_headers() -> bool:
554608 "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
555609 "to include x-aws-log-group and x-aws-log-stream"
556610 )
557- return False
611+ return OtlpLogHeaderSetting ( None , None , None , False )
558612
613+ log_group = None
614+ log_stream = None
615+ namespace = None
559616 filtered_log_headers_count = 0
560617
561618 for pair in logs_headers .split ("," ):
562619 if "=" in pair :
563620 split = pair .split ("=" , 1 )
564621 key = split [0 ]
565622 value = split [1 ]
566- if key in (AWS_OTLP_LOGS_GROUP_HEADER , AWS_OTLP_LOGS_STREAM_HEADER ) and value :
623+ if key == AWS_OTLP_LOGS_GROUP_HEADER and value :
624+ log_group = value
625+ filtered_log_headers_count += 1
626+ elif key == AWS_OTLP_LOGS_STREAM_HEADER and value :
627+ log_stream = value
567628 filtered_log_headers_count += 1
629+ elif key == AWS_EMF_METRICS_NAMESPACE and value :
630+ namespace = value
568631
569- if filtered_log_headers_count != 2 :
632+ is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None
633+
634+ if not is_valid :
570635 _logger .warning (
571636 "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
572637 "to have values for x-aws-log-group and x-aws-log-stream"
573638 )
574- return False
575639
576- return True
640+ return OtlpLogHeaderSetting ( log_group , log_stream , namespace , is_valid )
577641
578642
579643def _get_metric_export_interval ():
@@ -644,3 +708,73 @@ def create_exporter(self):
644708 )
645709
646710 raise RuntimeError (f"Unsupported AWS Application Signals export protocol: { protocol } " )
711+
712+
713+ def _check_emf_exporter_enabled () -> bool :
714+ """
715+ Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
716+ and updates the environment variable.
717+
718+ Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
719+ from _import_exporters in OTel dependencies which would try to load exporters
720+ We will contribute emf exporter to upstream for supporting OTel metrics in SDK
721+
722+ Returns:
723+ bool: True if "awsemf" was found and removed, False otherwise.
724+ """
725+ # Get the current exporter value
726+ exporter_value = os .environ .get ("OTEL_METRICS_EXPORTER" , "" )
727+
728+ # Check if it's empty
729+ if not exporter_value :
730+ return False
731+
732+ # Split by comma and convert to list
733+ exporters = [exp .strip () for exp in exporter_value .split ("," )]
734+
735+ # Check if awsemf is in the list
736+ if "awsemf" not in exporters :
737+ return False
738+
739+ # Remove awsemf from the list
740+ exporters .remove ("awsemf" )
741+
742+ # Join the remaining exporters and update the environment variable
743+ new_value = "," .join (exporters ) if exporters else ""
744+
745+ # Set the new value (or unset if empty)
746+ if new_value :
747+ os .environ ["OTEL_METRICS_EXPORTER" ] = new_value
748+ elif "OTEL_METRICS_EXPORTER" in os .environ :
749+ del os .environ ["OTEL_METRICS_EXPORTER" ]
750+
751+ return True
752+
753+
754+ def create_emf_exporter ():
755+ """Create and configure the CloudWatch EMF exporter."""
756+ try :
757+ # Check if botocore is available before importing the EMF exporter
758+ if not is_installed ("botocore" ):
759+ _logger .warning ("botocore is not installed. EMF exporter requires botocore" )
760+ return None
761+
762+ # pylint: disable=import-outside-toplevel
763+ from amazon .opentelemetry .distro .exporter .aws .metrics .aws_cloudwatch_emf_exporter import (
764+ AwsCloudWatchEmfExporter ,
765+ )
766+
767+ log_header_setting = _validate_and_fetch_logs_header ()
768+
769+ if not log_header_setting .is_valid :
770+ return None
771+
772+ return AwsCloudWatchEmfExporter (
773+ namespace = log_header_setting .namespace ,
774+ log_group_name = log_header_setting .log_group ,
775+ log_stream_name = log_header_setting .log_stream ,
776+ )
777+ # pylint: disable=broad-exception-caught
778+ except Exception as errors :
779+ _logger .error ("Failed to create EMF exporter: %s" , errors )
780+ return None
0 commit comments