Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import os
import re
from logging import NOTSET, Logger, getLogger
from typing import ClassVar, Dict, List, Type, Union
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union

from importlib_metadata import version
from typing_extensions import override

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
Expand Down Expand Up @@ -98,6 +98,7 @@

AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"

# UDP package size is not larger than 64KB
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
Expand All @@ -113,6 +114,13 @@
_logger: Logger = getLogger(__name__)


class OtlpLogHeaderSetting(NamedTuple):
log_group: Optional[str]
log_stream: Optional[str]
namespace: Optional[str]
is_valid: bool


class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator):
"""
This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change:
Expand Down Expand Up @@ -141,6 +149,11 @@ def _configure(self, **kwargs):
# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
# internal logic.
def _initialize_components():
# Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
# from _import_exporters in OTel dependencies which would try to load exporters
# We will contribute emf exporter to upstream for supporting OTel metrics in SDK
is_emf_enabled = _check_emf_exporter_enabled()

trace_exporters, metric_exporters, log_exporters = _import_exporters(
_get_exporter_names("traces"),
_get_exporter_names("metrics"),
Expand Down Expand Up @@ -176,7 +189,8 @@ def _initialize_components():
sampler=sampler,
resource=resource,
)
_init_metrics(metric_exporters, resource)

_init_metrics(metric_exporters, resource, is_emf_enabled)
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false")
if logging_enabled.strip().lower() == "true":
_init_logging(log_exporters, resource)
Expand Down Expand Up @@ -235,6 +249,7 @@ def _init_tracing(
def _init_metrics(
exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]],
resource: Resource = None,
is_emf_enabled: bool = False,
):
metric_readers = []
views = []
Expand All @@ -247,7 +262,7 @@ def _init_metrics(
else:
metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args)))

_customize_metric_exporters(metric_readers, views)
_customize_metric_exporters(metric_readers, views, is_emf_enabled)

provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views)
set_meter_provider(provider)
Expand Down Expand Up @@ -408,7 +423,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
_logger.info("Detected using AWS OTLP Logs Endpoint.")

if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers():
if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid:
# Setting default compression mode to Gzip as this is the behavior in upstream's
# collector otlp http exporter:
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
Expand Down Expand Up @@ -466,7 +481,9 @@ def session_id_predicate(baggage_key: str) -> bool:
return


def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None:
def _customize_metric_exporters(
metric_readers: List[MetricReader], views: List[View], is_emf_enabled: bool = False
) -> None:
if _is_application_signals_runtime_enabled():
_get_runtime_metric_views(views, 0 == len(metric_readers))

Expand All @@ -478,6 +495,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
)
metric_readers.append(scope_based_periodic_exporting_metric_reader)

if is_emf_enabled:
emf_exporter = create_emf_exporter()
if emf_exporter:
metric_readers.append(PeriodicExportingMetricReader(emf_exporter))


def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None:
runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
Expand Down Expand Up @@ -567,7 +589,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
return bool(re.match(pattern, otlp_endpoint.lower()))


def _validate_logs_headers() -> bool:
def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
AWS OTLP Logs endpoint."""

Expand All @@ -578,26 +600,36 @@ def _validate_logs_headers() -> bool:
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
"to include x-aws-log-group and x-aws-log-stream"
)
return False
return OtlpLogHeaderSetting(None, None, None, False)

log_group = None
log_stream = None
namespace = None
filtered_log_headers_count = 0

for pair in logs_headers.split(","):
if "=" in pair:
split = pair.split("=", 1)
key = split[0]
value = split[1]
if key in (AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER) and value:
if key == AWS_OTLP_LOGS_GROUP_HEADER and value:
log_group = value
filtered_log_headers_count += 1
elif key == AWS_OTLP_LOGS_STREAM_HEADER and value:
log_stream = value
filtered_log_headers_count += 1
elif key == AWS_EMF_METRICS_NAMESPACE and value:
namespace = value

if filtered_log_headers_count != 2:
is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None

if not is_valid:
_logger.warning(
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
"to have values for x-aws-log-group and x-aws-log-stream"
)
return False

return True
return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid)


def _get_metric_export_interval():
Expand Down Expand Up @@ -668,3 +700,73 @@ def create_exporter(self):
)

raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")


def _check_emf_exporter_enabled() -> bool:
"""
Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
and updates the environment variable.

Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
from _import_exporters in OTel dependencies which would try to load exporters
We will contribute emf exporter to upstream for supporting OTel metrics in SDK

Returns:
bool: True if "awsemf" was found and removed, False otherwise.
"""
# Get the current exporter value
exporter_value = os.environ.get("OTEL_METRICS_EXPORTER", "")

# Check if it's empty
if not exporter_value:
return False

# Split by comma and convert to list
exporters = [exp.strip() for exp in exporter_value.split(",")]

# Check if awsemf is in the list
if "awsemf" not in exporters:
return False

# Remove awsemf from the list
exporters.remove("awsemf")

# Join the remaining exporters and update the environment variable
new_value = ",".join(exporters) if exporters else ""

# Set the new value (or unset if empty)
if new_value:
os.environ["OTEL_METRICS_EXPORTER"] = new_value
elif "OTEL_METRICS_EXPORTER" in os.environ:
del os.environ["OTEL_METRICS_EXPORTER"]

return True


def create_emf_exporter():
"""Create and configure the CloudWatch EMF exporter."""
try:
# Check if botocore is available before importing the EMF exporter
if not is_installed("botocore"):
_logger.warning("botocore is not installed. EMF exporter requires botocore")
return None

# pylint: disable=import-outside-toplevel
from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import (
AwsCloudWatchEmfExporter,
)

log_header_setting = _validate_and_fetch_logs_header()

if not log_header_setting.is_valid:
return None

return AwsCloudWatchEmfExporter(
namespace=log_header_setting.namespace,
log_group_name=log_header_setting.log_group,
log_stream_name=log_header_setting.log_stream,
)
# pylint: disable=broad-exception-caught
except Exception as errors:
_logger.error("Failed to create EMF exporter: %s", errors)
return None
Loading
Loading