Skip to content

Commit 1bbfb3f

Browse files
committed
Add CloudWatch EMF exporter integration to AWS OpenTelemetry configurator
1 parent aed584f commit 1bbfb3f

File tree

2 files changed

+260
-11
lines changed

2 files changed

+260
-11
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 101 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import re
66
from logging import NOTSET, Logger, getLogger
7-
from typing import ClassVar, Dict, List, Type, Union
7+
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
88

99
from importlib_metadata import version
1010
from typing_extensions import override
@@ -22,6 +22,7 @@
2222
AwsMetricAttributesSpanExporterBuilder,
2323
)
2424
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
25+
from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter
2526
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
2627
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
2728
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
@@ -98,6 +99,7 @@
9899

99100
AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
100101
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
102+
AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"
101103

102104
# UDP package size is not larger than 64KB
103105
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
@@ -113,6 +115,13 @@
113115
_logger: Logger = getLogger(__name__)
114116

115117

118+
class OtlpLogHeaderSetting(NamedTuple):
119+
log_group: Optional[str]
120+
log_stream: Optional[str]
121+
namespace: Optional[str]
122+
is_valid: bool
123+
124+
116125
class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator):
117126
"""
118127
This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change:
@@ -141,6 +150,11 @@ def _configure(self, **kwargs):
141150
# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
142151
# internal logic.
143152
def _initialize_components():
153+
# Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
154+
# from _import_exporters in OTel dependencies which would try to load exporters
155+
# We will need to contribute emf exporter to upstream contrib for
156+
is_emf_enabled = _check_emf_exporter_enabled()
157+
144158
trace_exporters, metric_exporters, log_exporters = _import_exporters(
145159
_get_exporter_names("traces"),
146160
_get_exporter_names("metrics"),
@@ -176,7 +190,8 @@ def _initialize_components():
176190
sampler=sampler,
177191
resource=resource,
178192
)
179-
_init_metrics(metric_exporters, resource)
193+
194+
_init_metrics(metric_exporters, resource, is_emf_enabled)
180195
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false")
181196
if logging_enabled.strip().lower() == "true":
182197
_init_logging(log_exporters, resource)
@@ -235,6 +250,7 @@ def _init_tracing(
235250
def _init_metrics(
236251
exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]],
237252
resource: Resource = None,
253+
is_emf_enabled: bool = False,
238254
):
239255
metric_readers = []
240256
views = []
@@ -247,7 +263,7 @@ def _init_metrics(
247263
else:
248264
metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args)))
249265

250-
_customize_metric_exporters(metric_readers, views)
266+
_customize_metric_exporters(metric_readers, views, is_emf_enabled)
251267

252268
provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views)
253269
set_meter_provider(provider)
@@ -397,7 +413,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
397413
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
398414
_logger.info("Detected using AWS OTLP Logs Endpoint.")
399415

400-
if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers():
416+
if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid:
401417
# Setting default compression mode to Gzip as this is the behavior in upstream's
402418
# collector otlp http exporter:
403419
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
@@ -450,7 +466,9 @@ def session_id_predicate(baggage_key: str) -> bool:
450466
return
451467

452468

453-
def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None:
469+
def _customize_metric_exporters(
470+
metric_readers: List[MetricReader], views: List[View], is_emf_enabled: bool = False
471+
) -> None:
454472
if _is_application_signals_runtime_enabled():
455473
_get_runtime_metric_views(views, 0 == len(metric_readers))
456474

@@ -462,6 +480,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
462480
)
463481
metric_readers.append(scope_based_periodic_exporting_metric_reader)
464482

483+
if is_emf_enabled:
484+
emf_exporter = create_emf_exporter()
485+
if emf_exporter:
486+
metric_readers.append(PeriodicExportingMetricReader(emf_exporter))
487+
465488

466489
def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None:
467490
runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
@@ -551,7 +574,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
551574
return bool(re.match(pattern, otlp_endpoint.lower()))
552575

553576

554-
def _validate_logs_headers() -> bool:
577+
def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
555578
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
556579
AWS OTLP Logs endpoint."""
557580

@@ -562,26 +585,36 @@ def _validate_logs_headers() -> bool:
562585
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
563586
"to include x-aws-log-group and x-aws-log-stream"
564587
)
565-
return False
588+
return OtlpLogHeaderSetting(None, None, None, False)
566589

590+
log_group = None
591+
log_stream = None
592+
namespace = None
567593
filtered_log_headers_count = 0
568594

569595
for pair in logs_headers.split(","):
570596
if "=" in pair:
571597
split = pair.split("=", 1)
572598
key = split[0]
573599
value = split[1]
574-
if key in (AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER) and value:
600+
if key == AWS_OTLP_LOGS_GROUP_HEADER and value:
601+
log_group = value
575602
filtered_log_headers_count += 1
603+
elif key == AWS_OTLP_LOGS_STREAM_HEADER and value:
604+
log_stream = value
605+
filtered_log_headers_count += 1
606+
elif key == AWS_EMF_METRICS_NAMESPACE and value:
607+
namespace = value
608+
609+
is_valid = filtered_log_headers_count == 2
576610

577-
if filtered_log_headers_count != 2:
611+
if not is_valid:
578612
_logger.warning(
579613
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
580614
"to have values for x-aws-log-group and x-aws-log-stream"
581615
)
582-
return False
583616

584-
return True
617+
return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid)
585618

586619

587620
def _get_metric_export_interval():
@@ -652,3 +685,60 @@ def create_exporter(self):
652685
)
653686

654687
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
688+
689+
690+
def _check_emf_exporter_enabled() -> bool:
691+
"""
692+
Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
693+
and updates the environment variable.
694+
695+
Returns:
696+
bool: True if "awsemf" was found and removed, False otherwise.
697+
"""
698+
# Get the current exporter value
699+
exporter_value = os.environ.get("OTEL_METRICS_EXPORTER", "")
700+
701+
# Check if it's empty
702+
if not exporter_value:
703+
return False
704+
705+
# Split by comma and convert to list
706+
exporters = [exp.strip() for exp in exporter_value.split(",")]
707+
708+
# Check if awsemf is in the list
709+
if "awsemf" not in exporters:
710+
return False
711+
712+
# Remove awsemf from the list
713+
exporters.remove("awsemf")
714+
715+
# Join the remaining exporters and update the environment variable
716+
new_value = ",".join(exporters) if exporters else ""
717+
718+
# Set the new value (or unset if empty)
719+
if new_value:
720+
os.environ["OTEL_METRICS_EXPORTER"] = new_value
721+
elif "OTEL_METRICS_EXPORTER" in os.environ:
722+
del os.environ["OTEL_METRICS_EXPORTER"]
723+
724+
return True
725+
726+
727+
def create_emf_exporter() -> Optional[AwsCloudWatchEmfExporter]:
728+
"""Create and configure the CloudWatch EMF exporter."""
729+
try:
730+
log_header_setting = _validate_and_fetch_logs_header()
731+
732+
if not log_header_setting.is_valid:
733+
_logger.warning("Log group is not set. Set Log Group with OTEL_EXPORTER_OTLP_LOGS_HEADERS=<EMF Log Group>")
734+
return None
735+
736+
return AwsCloudWatchEmfExporter(
737+
namespace=log_header_setting.namespace,
738+
log_group_name=log_header_setting.log_group,
739+
log_stream_name=log_header_setting.log_stream,
740+
)
741+
# pylint: disable=broad-exception-caught
742+
except Exception as errors:
743+
_logger.error("Failed to create EMF exporter: %s", errors)
744+
return None

0 commit comments

Comments
 (0)