4
4
import logging
5
5
import os
6
6
import re
7
- from typing import ClassVar , Dict , List , Optional , Type , Union
7
+ from logging import NOTSET , Logger , getLogger
8
+ from typing import ClassVar , Dict , List , NamedTuple , Optional , Type , Union
8
9
9
10
from importlib_metadata import version
10
11
from typing_extensions import override
11
12
12
13
from amazon .opentelemetry .distro ._aws_attribute_keys import AWS_LOCAL_SERVICE
13
14
from amazon .opentelemetry .distro ._aws_resource_attribute_configurator import get_service_attribute
14
- from amazon .opentelemetry .distro ._utils import is_agent_observability_enabled
15
+ from amazon .opentelemetry .distro ._utils import is_agent_observability_enabled , is_installed
15
16
from amazon .opentelemetry .distro .always_record_sampler import AlwaysRecordSampler
16
17
from amazon .opentelemetry .distro .attribute_propagating_span_processor_builder import (
17
18
AttributePropagatingSpanProcessorBuilder ,
104
105
105
106
AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
106
107
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
108
+ AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"
107
109
108
110
# UDP package size is not larger than 64KB
109
111
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
110
112
111
- _logger : logging .Logger = logging .getLogger (__name__ )
113
+ OTEL_TRACES_EXPORTER = "OTEL_TRACES_EXPORTER"
114
+ OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER"
115
+ OTEL_METRICS_EXPORTER = "OTEL_METRICS_EXPORTER"
116
+ OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
117
+ OTEL_TRACES_SAMPLER = "OTEL_TRACES_SAMPLER"
118
+ OTEL_PYTHON_DISABLED_INSTRUMENTATIONS = "OTEL_PYTHON_DISABLED_INSTRUMENTATIONS"
119
+ OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED = "OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED"
120
+
121
+ _logger : Logger = getLogger (__name__ )
122
+
123
+
124
+ class OtlpLogHeaderSetting (NamedTuple ):
125
+ log_group : Optional [str ]
126
+ log_stream : Optional [str ]
127
+ namespace : Optional [str ]
128
+ is_valid : bool
112
129
113
130
114
131
class AwsOpenTelemetryConfigurator (_OTelSDKConfigurator ):
@@ -138,7 +155,12 @@ def _configure(self, **kwargs):
138
155
# The OpenTelemetry Authors code
139
156
# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
140
157
# internal logic.
141
- def _initialize_components (setup_logging_handler : Optional [bool ] = None ):
158
+ def _initialize_components ():
159
+ # Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
160
+ # from _import_exporters in OTel dependencies which would try to load exporters
161
+ # We will contribute emf exporter to upstream for supporting OTel metrics in SDK
162
+ is_emf_enabled = _check_emf_exporter_enabled ()
163
+
142
164
trace_exporters , metric_exporters , log_exporters = _import_exporters (
143
165
_get_exporter_names ("traces" ),
144
166
_get_exporter_names ("metrics" ),
@@ -174,13 +196,11 @@ def _initialize_components(setup_logging_handler: Optional[bool] = None):
174
196
sampler = sampler ,
175
197
resource = resource ,
176
198
)
177
- _init_metrics (metric_exporters , resource )
178
199
179
- if setup_logging_handler is None :
180
- setup_logging_handler = (
181
- os .getenv (_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED , "false" ).strip ().lower () == "true"
182
- )
183
- _init_logging (log_exporters , resource , setup_logging_handler )
200
+ _init_metrics (metric_exporters , resource , is_emf_enabled )
201
+ logging_enabled = os .getenv (_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED , "false" )
202
+ if logging_enabled .strip ().lower () == "true" :
203
+ _init_logging (log_exporters , resource )
184
204
185
205
186
206
def _init_logging (
@@ -238,6 +258,7 @@ def _init_tracing(
238
258
def _init_metrics (
239
259
exporters_or_readers : Dict [str , Union [Type [MetricExporter ], Type [MetricReader ]]],
240
260
resource : Resource = None ,
261
+ is_emf_enabled : bool = False ,
241
262
):
242
263
metric_readers = []
243
264
views = []
@@ -250,7 +271,7 @@ def _init_metrics(
250
271
else :
251
272
metric_readers .append (PeriodicExportingMetricReader (exporter_or_reader_class (** exporter_args )))
252
273
253
- _customize_metric_exporters (metric_readers , views )
274
+ _customize_metric_exporters (metric_readers , views , is_emf_enabled )
254
275
255
276
provider = MeterProvider (resource = resource , metric_readers = metric_readers , views = views )
256
277
set_meter_provider (provider )
@@ -276,6 +297,17 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource:
276
297
)
277
298
278
299
300
+ def _export_unsampled_span_for_agent_observability (trace_provider : TracerProvider , resource : Resource = None ):
301
+ if not is_agent_observability_enabled ():
302
+ return
303
+
304
+ traces_endpoint = os .environ .get (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT )
305
+
306
+ span_exporter = OTLPAwsSpanExporter (endpoint = traces_endpoint , logger_provider = get_logger_provider ())
307
+
308
+ trace_provider .add_span_processor (BatchUnsampledSpanProcessor (span_exporter = span_exporter ))
309
+
310
+
279
311
def _is_defer_to_workers_enabled ():
280
312
return os .environ .get (OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG , "false" ).strip ().lower () == "true"
281
313
@@ -407,7 +439,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
407
439
if _is_aws_otlp_endpoint (logs_endpoint , "logs" ):
408
440
_logger .info ("Detected using AWS OTLP Logs Endpoint." )
409
441
410
- if isinstance (log_exporter , OTLPLogExporter ) and _validate_logs_headers () :
442
+ if isinstance (log_exporter , OTLPLogExporter ) and _validate_and_fetch_logs_header (). is_valid :
411
443
# Setting default compression mode to Gzip as this is the behavior in upstream's
412
444
# collector otlp http exporter:
413
445
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
@@ -426,9 +458,14 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
426
458
if _is_lambda_environment ():
427
459
provider .add_span_processor (AwsLambdaSpanProcessor ())
428
460
461
+ # We always send 100% spans to Genesis platform for agent observability because
462
+ # AI applications typically have low throughput traffic patterns and require
463
+ # comprehensive monitoring to catch subtle failure modes like hallucinations
464
+ # and quality degradation that sampling could miss.
429
465
# Add session.id baggage attribute to span attributes to support AI Agent use cases
430
466
# enabling session ID tracking in spans.
431
467
if is_agent_observability_enabled ():
468
+ _export_unsampled_span_for_agent_observability (provider , resource )
432
469
433
470
def session_id_predicate (baggage_key : str ) -> bool :
434
471
return baggage_key == "session.id"
@@ -460,7 +497,9 @@ def session_id_predicate(baggage_key: str) -> bool:
460
497
return
461
498
462
499
463
- def _customize_metric_exporters (metric_readers : List [MetricReader ], views : List [View ]) -> None :
500
+ def _customize_metric_exporters (
501
+ metric_readers : List [MetricReader ], views : List [View ], is_emf_enabled : bool = False
502
+ ) -> None :
464
503
if _is_application_signals_runtime_enabled ():
465
504
_get_runtime_metric_views (views , 0 == len (metric_readers ))
466
505
@@ -472,6 +511,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
472
511
)
473
512
metric_readers .append (scope_based_periodic_exporting_metric_reader )
474
513
514
+ if is_emf_enabled :
515
+ emf_exporter = create_emf_exporter ()
516
+ if emf_exporter :
517
+ metric_readers .append (PeriodicExportingMetricReader (emf_exporter ))
518
+
475
519
476
520
def _get_runtime_metric_views (views : List [View ], retain_runtime_only : bool ) -> None :
477
521
runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
@@ -561,7 +605,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "x
561
605
return bool (re .match (pattern , otlp_endpoint .lower ()))
562
606
563
607
564
- def _validate_logs_headers () -> bool :
608
+ def _validate_and_fetch_logs_header () -> OtlpLogHeaderSetting :
565
609
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
566
610
AWS OTLP Logs endpoint."""
567
611
@@ -572,26 +616,36 @@ def _validate_logs_headers() -> bool:
572
616
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
573
617
"to include x-aws-log-group and x-aws-log-stream"
574
618
)
575
- return False
619
+ return OtlpLogHeaderSetting ( None , None , None , False )
576
620
621
+ log_group = None
622
+ log_stream = None
623
+ namespace = None
577
624
filtered_log_headers_count = 0
578
625
579
626
for pair in logs_headers .split ("," ):
580
627
if "=" in pair :
581
628
split = pair .split ("=" , 1 )
582
629
key = split [0 ]
583
630
value = split [1 ]
584
- if key in (AWS_OTLP_LOGS_GROUP_HEADER , AWS_OTLP_LOGS_STREAM_HEADER ) and value :
631
+ if key == AWS_OTLP_LOGS_GROUP_HEADER and value :
632
+ log_group = value
585
633
filtered_log_headers_count += 1
634
+ elif key == AWS_OTLP_LOGS_STREAM_HEADER and value :
635
+ log_stream = value
636
+ filtered_log_headers_count += 1
637
+ elif key == AWS_EMF_METRICS_NAMESPACE and value :
638
+ namespace = value
639
+
640
+ is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None
586
641
587
- if filtered_log_headers_count != 2 :
642
+ if not is_valid :
588
643
_logger .warning (
589
644
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
590
645
"to have values for x-aws-log-group and x-aws-log-stream"
591
646
)
592
- return False
593
647
594
- return True
648
+ return OtlpLogHeaderSetting ( log_group , log_stream , namespace , is_valid )
595
649
596
650
597
651
def _get_metric_export_interval ():
@@ -662,3 +716,73 @@ def create_exporter(self):
662
716
)
663
717
664
718
raise RuntimeError (f"Unsupported AWS Application Signals export protocol: { protocol } " )
719
+
720
+
721
+ def _check_emf_exporter_enabled () -> bool :
722
+ """
723
+ Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
724
+ and updates the environment variable.
725
+
726
+ Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
727
+ from _import_exporters in OTel dependencies which would try to load exporters
728
+ We will contribute emf exporter to upstream for supporting OTel metrics in SDK
729
+
730
+ Returns:
731
+ bool: True if "awsemf" was found and removed, False otherwise.
732
+ """
733
+ # Get the current exporter value
734
+ exporter_value = os .environ .get ("OTEL_METRICS_EXPORTER" , "" )
735
+
736
+ # Check if it's empty
737
+ if not exporter_value :
738
+ return False
739
+
740
+ # Split by comma and convert to list
741
+ exporters = [exp .strip () for exp in exporter_value .split ("," )]
742
+
743
+ # Check if awsemf is in the list
744
+ if "awsemf" not in exporters :
745
+ return False
746
+
747
+ # Remove awsemf from the list
748
+ exporters .remove ("awsemf" )
749
+
750
+ # Join the remaining exporters and update the environment variable
751
+ new_value = "," .join (exporters ) if exporters else ""
752
+
753
+ # Set the new value (or unset if empty)
754
+ if new_value :
755
+ os .environ ["OTEL_METRICS_EXPORTER" ] = new_value
756
+ elif "OTEL_METRICS_EXPORTER" in os .environ :
757
+ del os .environ ["OTEL_METRICS_EXPORTER" ]
758
+
759
+ return True
760
+
761
+
762
+ def create_emf_exporter ():
763
+ """Create and configure the CloudWatch EMF exporter."""
764
+ try :
765
+ # Check if botocore is available before importing the EMF exporter
766
+ if not is_installed ("botocore" ):
767
+ _logger .warning ("botocore is not installed. EMF exporter requires botocore" )
768
+ return None
769
+
770
+ # pylint: disable=import-outside-toplevel
771
+ from amazon .opentelemetry .distro .exporter .aws .metrics .aws_cloudwatch_emf_exporter import (
772
+ AwsCloudWatchEmfExporter ,
773
+ )
774
+
775
+ log_header_setting = _validate_and_fetch_logs_header ()
776
+
777
+ if not log_header_setting .is_valid :
778
+ return None
779
+
780
+ return AwsCloudWatchEmfExporter (
781
+ namespace = log_header_setting .namespace ,
782
+ log_group_name = log_header_setting .log_group ,
783
+ log_stream_name = log_header_setting .log_stream ,
784
+ )
785
+ # pylint: disable=broad-exception-caught
786
+ except Exception as errors :
787
+ _logger .error ("Failed to create EMF exporter: %s" , errors )
788
+ return None
0 commit comments