1212
1313from amazon .opentelemetry .distro ._aws_attribute_keys import AWS_LOCAL_SERVICE , AWS_SERVICE_TYPE
1414from amazon .opentelemetry .distro ._aws_resource_attribute_configurator import get_service_attribute
15- from amazon .opentelemetry .distro ._utils import IS_BOTOCORE_INSTALLED , get_aws_session , is_agent_observability_enabled
15+ from amazon .opentelemetry .distro ._utils import get_aws_session , is_agent_observability_enabled
1616from amazon .opentelemetry .distro .always_record_sampler import AlwaysRecordSampler
1717from amazon .opentelemetry .distro .attribute_propagating_span_processor_builder import (
1818 AttributePropagatingSpanProcessorBuilder ,
2323 AwsMetricAttributesSpanExporterBuilder ,
2424)
2525from amazon .opentelemetry .distro .aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
26- from amazon .opentelemetry .distro .exporter .otlp .aws .logs .otlp_aws_logs_exporter import OTLPAwsLogExporter
27- from amazon .opentelemetry .distro .exporter .otlp .aws .traces .otlp_aws_span_exporter import OTLPAwsSpanExporter
2826from amazon .opentelemetry .distro .otlp_udp_exporter import OTLPUdpSpanExporter
2927from amazon .opentelemetry .distro .sampler .aws_xray_remote_sampler import AwsXRayRemoteSampler
3028from amazon .opentelemetry .distro .scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
@@ -210,9 +208,9 @@ def _init_logging(
210208
211209 for _ , exporter_class in exporters .items ():
212210 exporter_args = {}
213- log_exporter : LogExporter = _customize_logs_exporter ( exporter_class ( ** exporter_args ))
214- log_processor = _customize_log_record_processor ( log_exporter )
215- provider . add_log_record_processor ( log_processor )
211+ _customize_log_record_processor (
212+ logger_provider = provider , log_exporter = _customize_logs_exporter ( exporter_class ( ** exporter_args ) )
213+ )
216214
217215 event_logger_provider = EventLoggerProvider (logger_provider = provider )
218216 set_event_logger_provider (event_logger_provider )
@@ -299,10 +297,12 @@ def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvide
299297 return
300298
301299 traces_endpoint = os .environ .get (OTEL_EXPORTER_OTLP_TRACES_ENDPOINT )
300+ if traces_endpoint and _is_aws_otlp_endpoint (traces_endpoint ):
301+ endpoint = traces_endpoint .lower ()
302+ region = endpoint .split ("." )[1 ]
302303
303- span_exporter = _create_aws_exporter (endpoint = traces_endpoint )
304-
305- trace_provider .add_span_processor (BatchUnsampledSpanProcessor (span_exporter = span_exporter ))
304+ span_exporter = _create_aws_otlp_exporter (endpoint = endpoint , service = "xray" , region = region )
305+ trace_provider .add_span_processor (BatchUnsampledSpanProcessor (span_exporter = span_exporter ))
306306
307307
308308def _is_defer_to_workers_enabled ():
@@ -397,11 +397,13 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
397397 traces_endpoint = os .environ .get (AWS_XRAY_DAEMON_ADDRESS_CONFIG , "127.0.0.1:2000" )
398398 span_exporter = OTLPUdpSpanExporter (endpoint = traces_endpoint )
399399
400- if _is_aws_otlp_endpoint (traces_endpoint , "xray" ):
400+ if traces_endpoint and _is_aws_otlp_endpoint (traces_endpoint , "xray" ):
401401 _logger .info ("Detected using AWS OTLP Traces Endpoint." )
402402
403403 if isinstance (span_exporter , OTLPSpanExporter ):
404- return _create_aws_exporter (endpoint = traces_endpoint )
404+ endpoint = traces_endpoint .lower ()
405+ region = endpoint .split ("." )[1 ]
406+ return _create_aws_otlp_exporter (endpoint = traces_endpoint , service = "xray" , region = region )
405407
406408 else :
407409 _logger .warning (
@@ -415,24 +417,34 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
415417 return AwsMetricAttributesSpanExporterBuilder (span_exporter , resource ).build ()
416418
417419
418- def _customize_log_record_processor (log_exporter : LogExporter ) :
419- if isinstance ( log_exporter , OTLPAwsLogExporter ) and is_agent_observability_enabled () :
420- return AwsCloudWatchOtlpBatchLogRecordProcessor ( exporter = log_exporter )
420+ def _customize_log_record_processor (logger_provider : LoggerProvider , log_exporter : Optional [ LogExporter ]) -> None :
421+ if not log_exporter :
422+ return
421423
422- return BatchLogRecordProcessor (exporter = log_exporter )
424+ if is_agent_observability_enabled ():
425+ from amazon .opentelemetry .distro .exporter .otlp .aws .logs ._aws_cw_otlp_batch_log_record_processor import (
426+ AwsCloudWatchOtlpBatchLogRecordProcessor ,
427+ )
428+
429+ logger_provider .add_log_record_processor (AwsCloudWatchOtlpBatchLogRecordProcessor (exporter = log_exporter ))
430+ else :
431+ logger_provider .add_log_record_processor (BatchLogRecordProcessor (exporter = log_exporter ))
423432
424433
425434def _customize_logs_exporter (log_exporter : LogExporter ) -> LogExporter :
426435 logs_endpoint = os .environ .get (OTEL_EXPORTER_OTLP_LOGS_ENDPOINT )
427436
428- if _is_aws_otlp_endpoint (logs_endpoint , "logs" ):
437+ if logs_endpoint and _is_aws_otlp_endpoint (logs_endpoint , "logs" ):
438+
429439 _logger .info ("Detected using AWS OTLP Logs Endpoint." )
430440
431441 if isinstance (log_exporter , OTLPLogExporter ) and _validate_and_fetch_logs_header ().is_valid :
442+ endpoint = logs_endpoint .lower ()
443+ region = endpoint .split ("." )[1 ]
432444 # Setting default compression mode to Gzip as this is the behavior in upstream's
433445 # collector otlp http exporter:
434446 # https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
435- return _create_aws_exporter (endpoint = logs_endpoint )
447+ return _create_aws_otlp_exporter (endpoint = logs_endpoint , service = "logs" , region = region )
436448
437449 _logger .warning (
438450 "Improper configuration see: please export/set "
@@ -594,11 +606,11 @@ def _is_lambda_environment():
594606def _is_aws_otlp_endpoint (otlp_endpoint : Optional [str ] = None , service : str = "xray" ) -> bool :
595607 """Is the given endpoint an AWS OTLP endpoint?"""
596608
597- pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
598-
599609 if not otlp_endpoint :
600610 return False
601611
612+ pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
613+
602614 return bool (re .match (pattern , otlp_endpoint .lower ()))
603615
604616
@@ -787,7 +799,7 @@ def create_emf_exporter():
787799 return None
788800
789801
790- def _create_aws_exporter (endpoint : str ):
802+ def _create_aws_otlp_exporter (endpoint : str , service : str , region : str ):
791803 """Create and configure the AWS OTLP exporters."""
792804 try :
793805 session = get_aws_session ()
@@ -800,12 +812,7 @@ def _create_aws_exporter(endpoint: str):
800812 from amazon .opentelemetry .distro .exporter .otlp .aws .logs .otlp_aws_logs_exporter import OTLPAwsLogExporter
801813 from amazon .opentelemetry .distro .exporter .otlp .aws .traces .otlp_aws_span_exporter import OTLPAwsSpanExporter
802814
803- endpoint = endpoint .lower ()
804- split = endpoint .split ("." )
805- service = split [0 ]
806- region = split [1 ]
807-
808- if "xray" in service :
815+ if service == "xray" :
809816 if is_agent_observability_enabled ():
810817 # Span exporter needs an instance of logger provider in ai agent
811818 # observability case because we need to split input/output prompts
@@ -818,7 +825,7 @@ def _create_aws_exporter(endpoint: str):
818825
819826 return OTLPAwsSpanExporter (session = session , endpoint = endpoint , aws_region = region )
820827
821- if "logs" in service :
828+ if service == "logs" :
822829 return OTLPAwsLogExporter (session = session , aws_region = region )
823830
824831 # pylint: disable=broad-exception-caught
0 commit comments