From 6d42928081b724e0dfcd2e112911fab82e11805b Mon Sep 17 00:00:00 2001 From: aws-application-signals-bot <167233089+aws-application-signals-bot@users.noreply.github.com> Date: Mon, 28 Jul 2025 10:49:10 -0700 Subject: [PATCH 1/7] Post release 0.10.1: Update version to 0.10.1.dev0 (#440) This PR prepares the main branch for the next development cycle by updating the version to 0.10.1.dev0 and updating the image version to be scanned to the latest released. This PR should only be merge when release for version v0.10.1 is success. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: github-actions Co-authored-by: Steve Liu Co-authored-by: Min Xia --- .github/workflows/daily_scan.yml | 4 ++-- .../src/amazon/opentelemetry/distro/version.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/daily_scan.yml b/.github/workflows/daily_scan.yml index 10ee2ce4a..30b9476fe 100644 --- a/.github/workflows/daily_scan.yml +++ b/.github/workflows/daily_scan.yml @@ -82,7 +82,7 @@ jobs: id: high_scan uses: ./.github/actions/image_scan with: - image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.0" + image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.1" severity: 'CRITICAL,HIGH' - name: Perform low image scan @@ -90,7 +90,7 @@ jobs: id: low_scan uses: ./.github/actions/image_scan with: - image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.0" + image-ref: "public.ecr.aws/aws-observability/adot-autoinstrumentation-python:v0.10.1" severity: 'MEDIUM,LOW,UNKNOWN' - name: Configure AWS Credentials for emitting metrics diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py index 898210494..0c880afc5 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/version.py @@ -1,4 +1,4 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -__version__ = "0.10.1" +__version__ = "0.10.1.dev0" From cd4a0e132ff20df0bc9c2a70f27ff90937c75548 Mon Sep 17 00:00:00 2001 From: Lei Wang <66336933+wangzlei@users.noreply.github.com> Date: Wed, 30 Jul 2025 11:33:26 -0700 Subject: [PATCH 2/7] Support export OTel metrics to be EMF in starndard output (#437) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit *Issue #, if available:* Currently, OpenTelemetry (OTel) metrics users in AWS Lambda must export metrics via synchronous calls. This often results in increased function duration or lost metric data, due to how Lambda handles execution lifecycle — particularly during the [freeze phase](https://serverlessfolks.com/lambda-code-execution-freezethaw#heading-freeze). *Description of changes:* This PR introduces a new ConsoleEmfExporter, which exports OTel metrics to standard output using the CloudWatch Embedded Metric Format (EMF). In the Lambda environment, logs written to standard output are automatically forwarded to CloudWatch by Lambda's built-in logging agent. This makes ConsoleEmfExporter a simple and efficient way to export OTel metrics in Lambda, avoiding the overhead and reliability issues associated with synchronous metric export. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Jonathan Lee <107072447+jj22ee@users.noreply.github.com> --- .../distro/aws_opentelemetry_configurator.py | 64 +- .../metrics/aws_cloudwatch_emf_exporter.py | 550 +--------------- .../exporter/aws/metrics/base_emf_exporter.py | 596 ++++++++++++++++++ .../aws/metrics/console_emf_exporter.py | 104 +++ .../test_aws_cloudwatch_emf_exporter.py | 8 +- .../aws/metrics/test_base_emf_exporter.py | 291 +++++++++ .../aws/metrics/test_console_emf_exporter.py | 288 +++++++++ .../test_aws_opentelementry_configurator.py | 346 ++++++---- lambda-layer/src/otel-instrument | 24 +- 9 files changed, 1584 insertions(+), 687 deletions(-) create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index b89b0bcb1..8fdc5306e 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -122,7 +122,14 @@ class OtlpLogHeaderSetting(NamedTuple): log_group: Optional[str] log_stream: Optional[str] namespace: Optional[str] - is_valid: bool + + def is_valid(self) -> bool: + """Check if the log header setting is valid by ensuring both log_group and log_stream are present.""" + return self.log_group is not None and self.log_stream is not None + + +# Singleton cache for OtlpLogHeaderSetting +_otlp_log_header_setting_cache: Optional[OtlpLogHeaderSetting] = None class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): @@ -440,7 +447,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter: if isinstance(log_exporter, OTLPLogExporter): - if _validate_and_fetch_logs_header().is_valid: + if _fetch_logs_header().is_valid(): endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(logs_endpoint) # Setting default compression mode to Gzip as this is the behavior in upstream's # collector otlp http exporter: @@ -627,18 +634,23 @@ def _extract_endpoint_and_region_from_otlp_endpoint(endpoint: str): return endpoint, region -def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: - """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to - AWS OTLP Logs endpoint.""" +def _fetch_logs_header() -> OtlpLogHeaderSetting: + """Returns the OTLP log header setting as a singleton instance.""" + global _otlp_log_header_setting_cache # pylint: disable=global-statement + + if _otlp_log_header_setting_cache is not None: + return _otlp_log_header_setting_cache logs_headers = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS) if not logs_headers: - _logger.warning( - "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " - "to include x-aws-log-group and x-aws-log-stream" - ) - return OtlpLogHeaderSetting(None, None, None, False) + if not _is_lambda_environment(): + _logger.warning( + "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " + "to include x-aws-log-group and x-aws-log-stream" + ) + _otlp_log_header_setting_cache = OtlpLogHeaderSetting(None, None, None) + return _otlp_log_header_setting_cache log_group = None log_stream = None @@ -656,9 +668,14 @@ def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: elif key == AWS_EMF_METRICS_NAMESPACE and value: namespace = value - is_valid = log_group is not None and log_stream is not None + _otlp_log_header_setting_cache = OtlpLogHeaderSetting(log_group, log_stream, namespace) + return _otlp_log_header_setting_cache + - return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid) +def _clear_logs_header_cache(): + """Clear the singleton cache for OtlpLogHeaderSetting. Used primarily for testing.""" + global _otlp_log_header_setting_cache # pylint: disable=global-statement + _otlp_log_header_setting_cache = None def _get_metric_export_interval(): @@ -773,8 +790,25 @@ def _check_emf_exporter_enabled() -> bool: def _create_emf_exporter(): - """Create and configure the CloudWatch EMF exporter.""" + """ + Create the appropriate EMF exporter based on the environment and configuration. + + Returns: + ConsoleEmfExporter for Lambda without log headers log group and stream + AwsCloudWatchEmfExporter for other cases (when conditions are met) + None if CloudWatch exporter cannot be created + """ try: + log_header_setting = _fetch_logs_header() + + # Lambda without valid logs http headers - use Console EMF exporter + if _is_lambda_environment() and not log_header_setting.is_valid(): + # pylint: disable=import-outside-toplevel + from amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter import ConsoleEmfExporter + + return ConsoleEmfExporter(namespace=log_header_setting.namespace) + + # For non-Lambda environment or Lambda with valid headers - use CloudWatch EMF exporter session = get_aws_session() # Check if botocore is available before importing the EMF exporter if not session: @@ -786,9 +820,7 @@ def _create_emf_exporter(): AwsCloudWatchEmfExporter, ) - log_header_setting = _validate_and_fetch_logs_header() - - if not log_header_setting.is_valid: + if not log_header_setting.is_valid(): return None return AwsCloudWatchEmfExporter( diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py index 168ac7df1..6603391a1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py @@ -3,66 +3,18 @@ # pylint: disable=no-self-use -import json import logging -import math -import time -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Optional -from opentelemetry.sdk.metrics import Counter -from opentelemetry.sdk.metrics import Histogram as HistogramInstr -from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter -from opentelemetry.sdk.metrics._internal.point import Metric -from opentelemetry.sdk.metrics.export import ( - AggregationTemporality, - ExponentialHistogram, - Gauge, - Histogram, - MetricExporter, - MetricExportResult, - MetricsData, - NumberDataPoint, - Sum, -) -from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation -from opentelemetry.sdk.resources import Resource -from opentelemetry.util.types import Attributes +from opentelemetry.sdk.metrics.export import AggregationTemporality from ._cloudwatch_log_client import CloudWatchLogClient +from .base_emf_exporter import BaseEmfExporter logger = logging.getLogger(__name__) -class MetricRecord: - """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" - - def __init__(self, metric_name: str, metric_unit: str, metric_description: str): - """ - Initialize metric record. - - Args: - metric_name: Name of the metric - metric_unit: Unit of the metric - metric_description: Description of the metric - """ - # Instrument metadata - self.name = metric_name - self.unit = metric_unit - self.description = metric_description - - # Will be set by conversion methods - self.timestamp: Optional[int] = None - self.attributes: Attributes = {} - - # Different metric type data - only one will be set per record - self.value: Optional[float] = None - self.sum_data: Optional[Any] = None - self.histogram_data: Optional[Any] = None - self.exp_histogram_data: Optional[Any] = None - - -class AwsCloudWatchEmfExporter(MetricExporter): +class AwsCloudWatchEmfExporter(BaseEmfExporter): """ OpenTelemetry metrics exporter for CloudWatch EMF format. @@ -74,50 +26,6 @@ class AwsCloudWatchEmfExporter(MetricExporter): """ - # CloudWatch EMF supported units - # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html - EMF_SUPPORTED_UNITS = { - "Seconds", - "Microseconds", - "Milliseconds", - "Bytes", - "Kilobytes", - "Megabytes", - "Gigabytes", - "Terabytes", - "Bits", - "Kilobits", - "Megabits", - "Gigabits", - "Terabits", - "Percent", - "Count", - "Bytes/Second", - "Kilobytes/Second", - "Megabytes/Second", - "Gigabytes/Second", - "Terabytes/Second", - "Bits/Second", - "Kilobits/Second", - "Megabits/Second", - "Gigabits/Second", - "Terabits/Second", - "Count/Second", - "None", - } - - # OTel to CloudWatch unit mapping - # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 - UNIT_MAPPING = { - "1": "", - "ns": "", - "ms": "Milliseconds", - "s": "Seconds", - "us": "Microseconds", - "By": "Bytes", - "bit": "Bits", - } - def __init__( self, namespace: str = "default", @@ -140,26 +48,8 @@ def __init__( preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation **kwargs: Additional arguments passed to botocore client """ - # Set up temporality preference default to DELTA if customers not set - if preferred_temporality is None: - preferred_temporality = { - Counter: AggregationTemporality.DELTA, - HistogramInstr: AggregationTemporality.DELTA, - ObservableCounter: AggregationTemporality.DELTA, - ObservableGauge: AggregationTemporality.DELTA, - ObservableUpDownCounter: AggregationTemporality.DELTA, - UpDownCounter: AggregationTemporality.DELTA, - } - - # Set up aggregation preference default to exponential histogram for histogram metrics - if preferred_aggregation is None: - preferred_aggregation = { - HistogramInstr: ExponentialBucketHistogramAggregation(), - } - - super().__init__(preferred_temporality, preferred_aggregation) + super().__init__(namespace, preferred_temporality, preferred_aggregation) - self.namespace = namespace self.log_group_name = log_group_name # Initialize CloudWatch Logs client @@ -167,359 +57,7 @@ def __init__( log_group_name=log_group_name, log_stream_name=log_stream_name, aws_region=aws_region, **kwargs ) - def _get_metric_name(self, record: MetricRecord) -> Optional[str]: - """Get the metric name from the metric record or data point.""" - - try: - if record.name: - return record.name - except AttributeError: - pass - # Return None if no valid metric name found - return None - - def _get_unit(self, record: MetricRecord) -> Optional[str]: - """Get CloudWatch unit from MetricRecord unit.""" - unit = record.unit - - if not unit: - return None - - # First check if unit is already a supported EMF unit - if unit in self.EMF_SUPPORTED_UNITS: - return unit - - # Map from OTel unit to CloudWatch unit - mapped_unit = self.UNIT_MAPPING.get(unit) - - return mapped_unit - - def _get_dimension_names(self, attributes: Attributes) -> List[str]: - """Extract dimension names from attributes.""" - # Implement dimension selection logic - # For now, use all attributes as dimensions - return list(attributes.keys()) - - def _get_attributes_key(self, attributes: Attributes) -> str: - """ - Create a hashable key from attributes for grouping metrics. - - Args: - attributes: The attributes dictionary - - Returns: - A string representation of sorted attributes key-value pairs - """ - # Sort the attributes to ensure consistent keys - sorted_attrs = sorted(attributes.items()) - # Create a string representation of the attributes - return str(sorted_attrs) - - def _normalize_timestamp(self, timestamp_ns: int) -> int: - """ - Normalize a nanosecond timestamp to milliseconds for CloudWatch. - - Args: - timestamp_ns: Timestamp in nanoseconds - - Returns: - Timestamp in milliseconds - """ - # Convert from nanoseconds to milliseconds - return timestamp_ns // 1_000_000 - - def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: - """ - Creates the intermediate metric data structure that standardizes different otel metric representation - and will be used to generate EMF events. The base record - establishes the instrument schema (name/unit/description) that will be populated - with dimensions, timestamps, and values during metric processing. - - Args: - metric_name: Name of the metric - metric_unit: Unit of the metric - metric_description: Description of the metric - - Returns: - A MetricRecord object - """ - return MetricRecord(metric_name, metric_unit, metric_description) - - def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: - """Convert a Gauge or Sum metric datapoint to a metric record. - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and value - """ - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # Set the value directly for both Gauge and Sum - record.value = data_point.value - - return record - - def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: - """Convert a Histogram metric datapoint to a metric record. - - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and histogram_data - """ - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # For Histogram, set the histogram_data - record.histogram_data = { - "Count": data_point.count, - "Sum": data_point.sum, - "Min": data_point.min, - "Max": data_point.max, - } - return record - - # pylint: disable=too-many-locals - def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: - """ - Convert an ExponentialHistogram metric datapoint to a metric record. - - This function follows the logic of CalculateDeltaDatapoints in the Go implementation, - converting exponential buckets to their midpoint values. - - Ref: - https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and exp_histogram_data - """ - - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # Initialize arrays for values and counts - array_values = [] - array_counts = [] - - # Get scale - scale = data_point.scale - # Calculate base using the formula: 2^(2^(-scale)) - base = math.pow(2, math.pow(2, float(-scale))) - - # Process positive buckets - if data_point.positive and data_point.positive.bucket_counts: - positive_offset = getattr(data_point.positive, "offset", 0) - positive_bucket_counts = data_point.positive.bucket_counts - - bucket_begin = 0 - bucket_end = 0 - - for bucket_index, count in enumerate(positive_bucket_counts): - index = bucket_index + positive_offset - - if bucket_begin == 0: - bucket_begin = math.pow(base, float(index)) - else: - bucket_begin = bucket_end - - bucket_end = math.pow(base, float(index + 1)) - - # Calculate midpoint value of the bucket - metric_val = (bucket_begin + bucket_end) / 2 - - # Only include buckets with positive counts - if count > 0: - array_values.append(metric_val) - array_counts.append(float(count)) - - # Process zero bucket - zero_count = getattr(data_point, "zero_count", 0) - if zero_count > 0: - array_values.append(0) - array_counts.append(float(zero_count)) - - # Process negative buckets - if data_point.negative and data_point.negative.bucket_counts: - negative_offset = getattr(data_point.negative, "offset", 0) - negative_bucket_counts = data_point.negative.bucket_counts - - bucket_begin = 0 - bucket_end = 0 - - for bucket_index, count in enumerate(negative_bucket_counts): - index = bucket_index + negative_offset - - if bucket_end == 0: - bucket_end = -math.pow(base, float(index)) - else: - bucket_end = bucket_begin - - bucket_begin = -math.pow(base, float(index + 1)) - - # Calculate midpoint value of the bucket - metric_val = (bucket_begin + bucket_end) / 2 - - # Only include buckets with positive counts - if count > 0: - array_values.append(metric_val) - array_counts.append(float(count)) - - # Set the histogram data in the format expected by CloudWatch EMF - record.exp_histogram_data = { - "Values": array_values, - "Counts": array_counts, - "Count": data_point.count, - "Sum": data_point.sum, - "Max": data_point.max, - "Min": data_point.min, - } - - return record - - def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: - """Group metric record by attributes and timestamp. - - Args: - record: The metric record - - Returns: - A tuple key for grouping - """ - # Create a key for grouping based on attributes - attrs_key = self._get_attributes_key(record.attributes) - return (attrs_key, record.timestamp) - - def _create_emf_log( - self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None - ) -> Dict: - """ - Create EMF log dictionary from metric records. - - Since metric_records is already grouped by attributes, this function - creates a single EMF log for all records. - """ - # Start with base structure - emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} - - # Set with latest EMF version schema - # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 - emf_log["Version"] = "1" - - # Add resource attributes to EMF log but not as dimensions - # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes - # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, - # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. - # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. - if resource and resource.attributes: - for key, value in resource.attributes.items(): - emf_log[f"otel.resource.{key}"] = str(value) - - # Initialize collections for dimensions and metrics - metric_definitions = [] - # Collect attributes from all records (they should be the same for all records in the group) - # Only collect once from the first record and apply to all records - all_attributes = ( - metric_records[0].attributes - if metric_records and len(metric_records) > 0 and metric_records[0].attributes - else {} - ) - - # Process each metric record - for record in metric_records: - - metric_name = self._get_metric_name(record) - - # Skip processing if metric name is None or empty - if not metric_name: - continue - - # Create metric data dict - metric_data = {"Name": metric_name} - - unit = self._get_unit(record) - if unit: - metric_data["Unit"] = unit - - # Process different types of aggregations - if record.exp_histogram_data: - # Base2 Exponential Histogram - emf_log[metric_name] = record.exp_histogram_data - elif record.histogram_data: - # Regular Histogram metrics - emf_log[metric_name] = record.histogram_data - elif record.value is not None: - # Gauge, Sum, and other aggregations - emf_log[metric_name] = record.value - else: - logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) - continue - - # Add to metric definitions list - metric_definitions.append(metric_data) - - # Get dimension names from collected attributes - dimension_names = self._get_dimension_names(all_attributes) - - # Add attribute values to the root of the EMF log - for name, value in all_attributes.items(): - emf_log[name] = str(value) - - # Add CloudWatch Metrics if we have metrics, include dimensions only if they exist - if metric_definitions: - cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions} - if dimension_names: - cloudwatch_metric["Dimensions"] = [dimension_names] - - emf_log["_aws"]["CloudWatchMetrics"].append(cloudwatch_metric) - - return emf_log - - def _send_log_event(self, log_event: Dict[str, Any]): + def _export(self, log_event: Dict[str, Any]): """ Send a log event to CloudWatch Logs using the log client. @@ -528,82 +66,6 @@ def _send_log_event(self, log_event: Dict[str, Any]): """ self.log_client.send_log_event(log_event) - # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches - def export( - self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any - ) -> MetricExportResult: - """ - Export metrics as EMF logs to CloudWatch. - - Groups metrics by attributes and timestamp before creating EMF logs. - - Args: - metrics_data: MetricsData containing resource metrics and scope metrics - timeout_millis: Optional timeout in milliseconds - **kwargs: Additional keyword arguments - - Returns: - MetricExportResult indicating success or failure - """ - try: - if not metrics_data.resource_metrics: - return MetricExportResult.SUCCESS - - # Process all metrics from all resource metrics and scope metrics - for resource_metrics in metrics_data.resource_metrics: - for scope_metrics in resource_metrics.scope_metrics: - # Dictionary to group metrics by attributes and timestamp - grouped_metrics = defaultdict(list) - - # Process all metrics in this scope - for metric in scope_metrics.metrics: - # Skip if metric.data is None or no data_points exists - try: - if not (metric.data and metric.data.data_points): - continue - except AttributeError: - # Metric doesn't have data or data_points attribute - continue - - # Process metrics based on type - metric_type = type(metric.data) - if metric_type in (Gauge, Sum): - for dp in metric.data.data_points: - record = self._convert_gauge_and_sum(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - elif metric_type == Histogram: - for dp in metric.data.data_points: - record = self._convert_histogram(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - elif metric_type == ExponentialHistogram: - for dp in metric.data.data_points: - record = self._convert_exp_histogram(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - else: - logger.debug("Unsupported Metric Type: %s", metric_type) - - # Now process each group separately to create one EMF log per group - for (_, timestamp_ms), metric_records in grouped_metrics.items(): - if not metric_records: - continue - - # Create and send EMF log for this batch of metrics - self._send_log_event( - { - "message": json.dumps( - self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) - ), - "timestamp": timestamp_ms, - } - ) - - return MetricExportResult.SUCCESS - # pylint: disable=broad-exception-caught - # capture all types of exceptions to not interrupt the instrumented services - except Exception as error: - logger.error("Failed to export metrics: %s", error) - return MetricExportResult.FAILURE - def force_flush(self, timeout_millis: int = 10000) -> bool: # pylint: disable=unused-argument """ Force flush any pending metrics. diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py new file mode 100644 index 000000000..48cb5481a --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py @@ -0,0 +1,596 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=no-self-use + +import json +import logging +import math +import time +from abc import ABC, abstractmethod +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics import Histogram as HistogramInstr +from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter +from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + ExponentialHistogram, + Gauge, + Histogram, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, + Sum, +) +from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation +from opentelemetry.sdk.resources import Resource +from opentelemetry.util.types import Attributes + +logger = logging.getLogger(__name__) + + +class MetricRecord: + """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" + + def __init__(self, metric_name: str, metric_unit: str, metric_description: str): + """ + Initialize metric record. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + """ + # Instrument metadata + self.name = metric_name + self.unit = metric_unit + self.description = metric_description + + # Will be set by conversion methods + self.timestamp: Optional[int] = None + self.attributes: Attributes = {} + + # Different metric type data - only one will be set per record + self.value: Optional[float] = None + self.sum_data: Optional[Any] = None + self.histogram_data: Optional[Any] = None + self.exp_histogram_data: Optional[Any] = None + + +class BaseEmfExporter(MetricExporter, ABC): + """ + Base class for OpenTelemetry metrics exporters that convert to CloudWatch EMF format. + + This class contains all the common logic for converting OTel metrics into CloudWatch EMF logs. + Subclasses need to implement the _export method to define where the EMF logs are sent. + + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + """ + + # CloudWatch EMF supported units + # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + EMF_SUPPORTED_UNITS = { + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + "None", + } + + # OTel to CloudWatch unit mapping + # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 + UNIT_MAPPING = { + "1": "", + "ns": "", + "ms": "Milliseconds", + "s": "Seconds", + "us": "Microseconds", + "By": "Bytes", + "bit": "Bits", + } + + def __init__( + self, + namespace: str = "default", + preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, + preferred_aggregation: Optional[Dict[type, Any]] = None, + ): + """ + Initialize the base EMF exporter. + + Args: + namespace: CloudWatch namespace for metrics + preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality + preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation + """ + # Set up temporality preference default to DELTA if customers not set + if preferred_temporality is None: + preferred_temporality = { + Counter: AggregationTemporality.DELTA, + HistogramInstr: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.DELTA, + } + + # Set up aggregation preference default to exponential histogram for histogram metrics + if preferred_aggregation is None: + preferred_aggregation = { + HistogramInstr: ExponentialBucketHistogramAggregation(), + } + + super().__init__(preferred_temporality, preferred_aggregation) + + self.namespace = namespace + + def _get_metric_name(self, record: MetricRecord) -> Optional[str]: + """Get the metric name from the metric record or data point.""" + + try: + if record.name: + return record.name + except AttributeError: + pass + # Return None if no valid metric name found + return None + + def _get_unit(self, record: MetricRecord) -> Optional[str]: + """Get CloudWatch unit from MetricRecord unit.""" + unit = record.unit + + if not unit: + return None + + # First check if unit is already a supported EMF unit + if unit in self.EMF_SUPPORTED_UNITS: + return unit + + # Map from OTel unit to CloudWatch unit + mapped_unit = self.UNIT_MAPPING.get(unit) + + return mapped_unit + + def _get_dimension_names(self, attributes: Attributes) -> List[str]: + """Extract dimension names from attributes.""" + # Implement dimension selection logic + # For now, use all attributes as dimensions + return list(attributes.keys()) + + def _get_attributes_key(self, attributes: Attributes) -> str: + """ + Create a hashable key from attributes for grouping metrics. + + Args: + attributes: The attributes dictionary + + Returns: + A string representation of sorted attributes key-value pairs + """ + # Sort the attributes to ensure consistent keys + sorted_attrs = sorted(attributes.items()) + # Create a string representation of the attributes + return str(sorted_attrs) + + def _normalize_timestamp(self, timestamp_ns: int) -> int: + """ + Normalize a nanosecond timestamp to milliseconds for CloudWatch. + + Args: + timestamp_ns: Timestamp in nanoseconds + + Returns: + Timestamp in milliseconds + """ + # Convert from nanoseconds to milliseconds + return timestamp_ns // 1_000_000 + + def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: + """ + Creates the intermediate metric data structure that standardizes different otel metric representation + and will be used to generate EMF events. The base record + establishes the instrument schema (name/unit/description) that will be populated + with dimensions, timestamps, and values during metric processing. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + + Returns: + A MetricRecord object + """ + return MetricRecord(metric_name, metric_unit, metric_description) + + def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: + """Convert a Gauge or Sum metric datapoint to a metric record. + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and value + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Set the value directly for both Gauge and Sum + record.value = data_point.value + + return record + + def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """Convert a Histogram metric datapoint to a metric record. + + https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and histogram_data + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # For Histogram, set the histogram_data + record.histogram_data = { + "Count": data_point.count, + "Sum": data_point.sum, + "Min": data_point.min, + "Max": data_point.max, + } + return record + + # pylint: disable=too-many-locals + def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """ + Convert an ExponentialHistogram metric datapoint to a metric record. + + This function follows the logic of CalculateDeltaDatapoints in the Go implementation, + converting exponential buckets to their midpoint values. + + Ref: + https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and exp_histogram_data + """ + + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Initialize arrays for values and counts + array_values = [] + array_counts = [] + + # Get scale + scale = data_point.scale + # Calculate base using the formula: 2^(2^(-scale)) + base = math.pow(2, math.pow(2, float(-scale))) + + # Process positive buckets + if data_point.positive and data_point.positive.bucket_counts: + positive_offset = getattr(data_point.positive, "offset", 0) + positive_bucket_counts = data_point.positive.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(positive_bucket_counts): + index = bucket_index + positive_offset + + if bucket_begin == 0: + bucket_begin = math.pow(base, float(index)) + else: + bucket_begin = bucket_end + + bucket_end = math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Process zero bucket + zero_count = getattr(data_point, "zero_count", 0) + if zero_count > 0: + array_values.append(0) + array_counts.append(float(zero_count)) + + # Process negative buckets + if data_point.negative and data_point.negative.bucket_counts: + negative_offset = getattr(data_point.negative, "offset", 0) + negative_bucket_counts = data_point.negative.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(negative_bucket_counts): + index = bucket_index + negative_offset + + if bucket_end == 0: + bucket_end = -math.pow(base, float(index)) + else: + bucket_end = bucket_begin + + bucket_begin = -math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Set the histogram data in the format expected by CloudWatch EMF + record.exp_histogram_data = { + "Values": array_values, + "Counts": array_counts, + "Count": data_point.count, + "Sum": data_point.sum, + "Max": data_point.max, + "Min": data_point.min, + } + + return record + + def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: + """Group metric record by attributes and timestamp. + + Args: + record: The metric record + + Returns: + A tuple key for grouping + """ + # Create a key for grouping based on attributes + attrs_key = self._get_attributes_key(record.attributes) + return (attrs_key, record.timestamp) + + def _create_emf_log( + self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None + ) -> Dict: + """ + Create EMF log dictionary from metric records. + + Since metric_records is already grouped by attributes, this function + creates a single EMF log for all records. + """ + # Start with base structure + emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} + + # Set with latest EMF version schema + # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 + emf_log["Version"] = "1" + + # Add resource attributes to EMF log but not as dimensions + # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes + # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, + # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. + # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. + if resource and resource.attributes: + for key, value in resource.attributes.items(): + emf_log[f"otel.resource.{key}"] = str(value) + + # Initialize collections for dimensions and metrics + metric_definitions = [] + # Collect attributes from all records (they should be the same for all records in the group) + # Only collect once from the first record and apply to all records + all_attributes = ( + metric_records[0].attributes + if metric_records and len(metric_records) > 0 and metric_records[0].attributes + else {} + ) + + # Process each metric record + for record in metric_records: + + metric_name = self._get_metric_name(record) + + # Skip processing if metric name is None or empty + if not metric_name: + continue + + # Create metric data dict + metric_data = {"Name": metric_name} + + unit = self._get_unit(record) + if unit: + metric_data["Unit"] = unit + + # Process different types of aggregations + if record.exp_histogram_data: + # Base2 Exponential Histogram + emf_log[metric_name] = record.exp_histogram_data + elif record.histogram_data: + # Regular Histogram metrics + emf_log[metric_name] = record.histogram_data + elif record.value is not None: + # Gauge, Sum, and other aggregations + emf_log[metric_name] = record.value + else: + logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) + continue + + # Add to metric definitions list + metric_definitions.append(metric_data) + + # Get dimension names from collected attributes + dimension_names = self._get_dimension_names(all_attributes) + + # Add attribute values to the root of the EMF log + for name, value in all_attributes.items(): + emf_log[name] = str(value) + + # Add CloudWatch Metrics if we have metrics, include dimensions only if they exist + if metric_definitions: + cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions} + if dimension_names: + cloudwatch_metric["Dimensions"] = [dimension_names] + + emf_log["_aws"]["CloudWatchMetrics"].append(cloudwatch_metric) + + return emf_log + + @abstractmethod + def _export(self, log_event: Dict[str, Any]): + """ + Send a log event to the destination (CloudWatch Logs, console, etc.). + + This method must be implemented by subclasses to define where the EMF logs are sent. + + Args: + log_event: The log event to send + """ + + # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches + def export( + self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any + ) -> MetricExportResult: + """ + Export metrics as EMF logs. + + Groups metrics by attributes and timestamp before creating EMF logs. + + Args: + metrics_data: MetricsData containing resource metrics and scope metrics + timeout_millis: Optional timeout in milliseconds + **kwargs: Additional keyword arguments + + Returns: + MetricExportResult indicating success or failure + """ + try: + if not metrics_data.resource_metrics: + return MetricExportResult.SUCCESS + + # Process all metrics from all resource metrics and scope metrics + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + # Dictionary to group metrics by attributes and timestamp + grouped_metrics = defaultdict(list) + + # Process all metrics in this scope + for metric in scope_metrics.metrics: + # Skip if metric.data is None or no data_points exists + try: + if not (metric.data and metric.data.data_points): + continue + except AttributeError: + # Metric doesn't have data or data_points attribute + continue + + # Process metrics based on type + metric_type = type(metric.data) + if metric_type in (Gauge, Sum): + for dp in metric.data.data_points: + record = self._convert_gauge_and_sum(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == Histogram: + for dp in metric.data.data_points: + record = self._convert_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == ExponentialHistogram: + for dp in metric.data.data_points: + record = self._convert_exp_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + else: + logger.debug("Unsupported Metric Type: %s", metric_type) + + # Now process each group separately to create one EMF log per group + for (_, timestamp_ms), metric_records in grouped_metrics.items(): + if not metric_records: + continue + + # Create and send EMF log for this batch of metrics + logger.debug( + "Creating EMF log for %d records at timestamp %s", + len(metric_records), + timestamp_ms, + ) + self._export( + { + "message": json.dumps( + self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) + ), + "timestamp": timestamp_ms, + } + ) + + return MetricExportResult.SUCCESS + # pylint: disable=broad-exception-caught + # capture all types of exceptions to not interrupt the instrumented services + except Exception as error: + logger.error("Failed to export metrics: %s", error) + return MetricExportResult.FAILURE diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py new file mode 100644 index 000000000..8563c16b2 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py @@ -0,0 +1,104 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=no-self-use + +import logging +from typing import Any, Dict, Optional + +from opentelemetry.sdk.metrics.export import AggregationTemporality + +from .base_emf_exporter import BaseEmfExporter + +logger = logging.getLogger(__name__) + + +class ConsoleEmfExporter(BaseEmfExporter): + """ + OpenTelemetry metrics exporter for CloudWatch EMF format to console output. + + This exporter converts OTel metrics into CloudWatch EMF logs and writes them + to standard output instead of sending to CloudWatch Logs. This is useful for + debugging, testing, or when you want to process EMF logs with other tools. + + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + """ + + def __init__( + self, + namespace: str = "default", + preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, + preferred_aggregation: Optional[Dict[type, Any]] = None, + **kwargs: Any, + ) -> None: + """ + Initialize the Console EMF exporter. + + Args: + namespace: CloudWatch namespace for metrics (defaults to "default") + preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality + preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation + **kwargs: Additional arguments (unused, kept for compatibility) + """ + # No need to check for None since namespace has a default value + if namespace is None: + namespace = "default" + super().__init__(namespace, preferred_temporality, preferred_aggregation) + + def _export(self, log_event: Dict[str, Any]) -> None: + """ + Send a log event message to stdout for console output. + + This method writes the EMF log message to stdout, making it easy to + capture and redirect the output for processing or debugging purposes. + + Args: + log_event: The log event dictionary containing 'message' and 'timestamp' + keys, where 'message' is the JSON-serialized EMF log + + Raises: + No exceptions are raised - errors are logged and handled gracefully + """ + try: + # Write the EMF log message to stdout for easy redirection/capture + message = log_event.get("message", "") + if message: + print(message, flush=True) + else: + logger.warning("Empty message in log event: %s", log_event) + except Exception as error: # pylint: disable=broad-exception-caught + logger.error("Failed to write EMF log to console. Log event: %s. Error: %s", log_event, error) + + def force_flush(self, timeout_millis: int = 10000) -> bool: + """ + Force flush any pending metrics. + + For console output, there's no buffering since we use print() with + flush=True, so this is effectively a no-op that always succeeds. + + Args: + timeout_millis: Timeout in milliseconds (unused for console output) + + Returns: + Always returns True as console output is immediately flushed + """ + logger.debug("ConsoleEmfExporter force_flush called - no buffering to flush for console output") + return True + + def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool: + """ + Shutdown the exporter. + + For console output, there are no resources to clean up or connections + to close, so this is effectively a no-op that always succeeds. + + Args: + timeout_millis: Timeout in milliseconds (unused for console output) + **kwargs: Additional keyword arguments (unused for console output) + + Returns: + Always returns True as there's no cleanup required for console output + """ + logger.debug("ConsoleEmfExporter shutdown called with timeout_millis=%s", timeout_millis) + return True diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py index d6a5903af..c8595cdb7 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py @@ -635,8 +635,8 @@ def test_shutdown(self, mock_force_flush): mock_force_flush.assert_called_once_with(5000) # pylint: disable=broad-exception-caught - def test_send_log_event_method_exists(self): - """Test that _send_log_event method exists and can be called.""" + def test_export_method_exists(self): + """Test that _export method exists and can be called.""" # Just test that the method exists and doesn't crash with basic input log_event = {"message": "test message", "timestamp": 1234567890} @@ -644,10 +644,10 @@ def test_send_log_event_method_exists(self): with patch.object(self.exporter.log_client, "send_log_event") as mock_send: # Should not raise an exception try: - self.exporter._send_log_event(log_event) + self.exporter._export(log_event) mock_send.assert_called_once_with(log_event) except Exception as error: - self.fail(f"_send_log_event raised an exception: {error}") + self.fail(f"_export raised an exception: {error}") if __name__ == "__main__": diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py new file mode 100644 index 000000000..fa0ace40a --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py @@ -0,0 +1,291 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import unittest +from unittest.mock import Mock + +from amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter import BaseEmfExporter, MetricRecord +from opentelemetry.sdk.metrics.export import MetricExportResult +from opentelemetry.sdk.resources import Resource + + +class ConcreteEmfExporter(BaseEmfExporter): + """Concrete implementation of BaseEmfExporter for testing.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.exported_logs = [] + + def _export(self, log_event): + """Implementation that stores exported logs for testing.""" + self.exported_logs.append(log_event) + + def force_flush(self, timeout_millis=None): # pylint: disable=no-self-use + """Force flush implementation for testing.""" + return True + + def shutdown(self, timeout_millis=None): # pylint: disable=no-self-use + """Shutdown implementation for testing.""" + return True + + +class TestMetricRecord(unittest.TestCase): + """Test MetricRecord class.""" + + def test_metric_record_initialization(self): + """Test MetricRecord initialization with basic values.""" + record = MetricRecord("test_metric", "Count", "Test description") + + self.assertEqual(record.name, "test_metric") + self.assertEqual(record.unit, "Count") + self.assertEqual(record.description, "Test description") + self.assertIsNone(record.timestamp) + self.assertEqual(record.attributes, {}) + self.assertIsNone(record.value) + self.assertIsNone(record.sum_data) + self.assertIsNone(record.histogram_data) + self.assertIsNone(record.exp_histogram_data) + + +class TestBaseEmfExporter(unittest.TestCase): + """Test BaseEmfExporter class.""" + + def setUp(self): + """Set up test fixtures.""" + self.exporter = ConcreteEmfExporter(namespace="TestNamespace") + + def test_initialization(self): + """Test exporter initialization.""" + exporter = ConcreteEmfExporter() + self.assertEqual(exporter.namespace, "default") + + exporter_custom = ConcreteEmfExporter(namespace="CustomNamespace") + self.assertEqual(exporter_custom.namespace, "CustomNamespace") + + def test_get_metric_name(self): + """Test metric name extraction.""" + # Test with valid name + record = Mock() + record.name = "test_metric" + result = self.exporter._get_metric_name(record) + self.assertEqual(result, "test_metric") + + # Test with empty name + record.name = "" + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + # Test with None name + record.name = None + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + def test_get_unit(self): + """Test unit mapping functionality.""" + # Test EMF supported units (should return as-is) + record = MetricRecord("test", "Count", "desc") + self.assertEqual(self.exporter._get_unit(record), "Count") + + record = MetricRecord("test", "Percent", "desc") + self.assertEqual(self.exporter._get_unit(record), "Percent") + + # Test OTel unit mapping + record = MetricRecord("test", "ms", "desc") + self.assertEqual(self.exporter._get_unit(record), "Milliseconds") + + record = MetricRecord("test", "s", "desc") + self.assertEqual(self.exporter._get_unit(record), "Seconds") + + record = MetricRecord("test", "By", "desc") + self.assertEqual(self.exporter._get_unit(record), "Bytes") + + # Test units that map to empty string + record = MetricRecord("test", "1", "desc") + self.assertEqual(self.exporter._get_unit(record), "") + + # Test unknown unit + record = MetricRecord("test", "unknown", "desc") + self.assertIsNone(self.exporter._get_unit(record)) + + # Test None unit + record = MetricRecord("test", None, "desc") + self.assertIsNone(self.exporter._get_unit(record)) + + def test_get_dimension_names(self): + """Test dimension names extraction.""" + attributes = {"service": "test", "env": "prod"} + result = self.exporter._get_dimension_names(attributes) + self.assertEqual(set(result), {"service", "env"}) + + # Test empty attributes + result = self.exporter._get_dimension_names({}) + self.assertEqual(result, []) + + def test_get_attributes_key(self): + """Test attributes key generation.""" + attrs1 = {"b": "2", "a": "1"} + attrs2 = {"a": "1", "b": "2"} + + key1 = self.exporter._get_attributes_key(attrs1) + key2 = self.exporter._get_attributes_key(attrs2) + + # Keys should be consistent regardless of order + self.assertEqual(key1, key2) + self.assertIsInstance(key1, str) + + def test_normalize_timestamp(self): + """Test timestamp normalization.""" + timestamp_ns = 1609459200000000000 # nanoseconds + expected_ms = 1609459200000 # milliseconds + + result = self.exporter._normalize_timestamp(timestamp_ns) + self.assertEqual(result, expected_ms) + + def test_create_metric_record(self): + """Test metric record creation.""" + record = self.exporter._create_metric_record("test_metric", "Count", "Description") + + self.assertIsInstance(record, MetricRecord) + self.assertEqual(record.name, "test_metric") + self.assertEqual(record.unit, "Count") + self.assertEqual(record.description, "Description") + + def test_convert_gauge_and_sum(self): + """Test gauge and sum conversion.""" + metric = Mock() + metric.name = "test_gauge" + metric.unit = "Count" + metric.description = "Test gauge" + + data_point = Mock() + data_point.value = 42.0 + data_point.attributes = {"key": "value"} + data_point.time_unix_nano = 1609459200000000000 + + record = self.exporter._convert_gauge_and_sum(metric, data_point) + + self.assertEqual(record.name, "test_gauge") + self.assertEqual(record.value, 42.0) + self.assertEqual(record.attributes, {"key": "value"}) + self.assertEqual(record.timestamp, 1609459200000) + + def test_convert_histogram(self): + """Test histogram conversion.""" + metric = Mock() + metric.name = "test_histogram" + metric.unit = "ms" + metric.description = "Test histogram" + + data_point = Mock() + data_point.count = 5 + data_point.sum = 25.0 + data_point.min = 1.0 + data_point.max = 10.0 + data_point.attributes = {"service": "test"} + data_point.time_unix_nano = 1609459200000000000 + + record = self.exporter._convert_histogram(metric, data_point) + + self.assertEqual(record.name, "test_histogram") + expected_data = {"Count": 5, "Sum": 25.0, "Min": 1.0, "Max": 10.0} + self.assertEqual(record.histogram_data, expected_data) + self.assertEqual(record.attributes, {"service": "test"}) + + def test_convert_exp_histogram(self): + """Test exponential histogram conversion.""" + metric = Mock() + metric.name = "test_exp_histogram" + metric.unit = "s" + metric.description = "Test exponential histogram" + + data_point = Mock() + data_point.count = 10 + data_point.sum = 50.0 + data_point.min = 1.0 + data_point.max = 20.0 + data_point.scale = 1 + data_point.zero_count = 0 + data_point.attributes = {"env": "test"} + data_point.time_unix_nano = 1609459200000000000 + + # Mock buckets + data_point.positive = Mock() + data_point.positive.offset = 0 + data_point.positive.bucket_counts = [1, 2, 1] + + data_point.negative = Mock() + data_point.negative.offset = 0 + data_point.negative.bucket_counts = [] + + record = self.exporter._convert_exp_histogram(metric, data_point) + + self.assertEqual(record.name, "test_exp_histogram") + self.assertIsNotNone(record.exp_histogram_data) + self.assertIn("Values", record.exp_histogram_data) + self.assertIn("Counts", record.exp_histogram_data) + self.assertEqual(record.exp_histogram_data["Count"], 10) + self.assertEqual(record.exp_histogram_data["Sum"], 50.0) + + def test_group_by_attributes_and_timestamp(self): + """Test grouping by attributes and timestamp.""" + record = Mock() + record.attributes = {"env": "test"} + record.timestamp = 1234567890 + + result = self.exporter._group_by_attributes_and_timestamp(record) + + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + self.assertEqual(result[1], 1234567890) + + def test_create_emf_log(self): + """Test EMF log creation.""" + # Create a simple metric record + record = self.exporter._create_metric_record("test_metric", "Count", "Test") + record.value = 50.0 + record.timestamp = 1234567890 + record.attributes = {"env": "test"} + + records = [record] + resource = Resource.create({"service.name": "test-service"}) + + result = self.exporter._create_emf_log(records, resource, 1234567890) + + # Check basic EMF structure + self.assertIn("_aws", result) + self.assertIn("CloudWatchMetrics", result["_aws"]) + self.assertEqual(result["_aws"]["Timestamp"], 1234567890) + self.assertEqual(result["Version"], "1") + + # Check metric value + self.assertEqual(result["test_metric"], 50.0) + + # Check resource attributes + self.assertEqual(result["otel.resource.service.name"], "test-service") + + # Check CloudWatch metrics + cw_metrics = result["_aws"]["CloudWatchMetrics"][0] + self.assertEqual(cw_metrics["Namespace"], "TestNamespace") + self.assertEqual(cw_metrics["Metrics"][0]["Name"], "test_metric") + + def test_export_empty_metrics(self): + """Test export with empty metrics data.""" + metrics_data = Mock() + metrics_data.resource_metrics = [] + + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_failure_handling(self): + """Test export failure handling.""" + metrics_data = Mock() + # Make iteration fail + metrics_data.resource_metrics = Mock() + metrics_data.resource_metrics.__iter__ = Mock(side_effect=Exception("Test exception")) + + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.FAILURE) + + +if __name__ == "__main__": + unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py new file mode 100644 index 000000000..8783dfd92 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py @@ -0,0 +1,288 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import unittest +from io import StringIO +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter import ConsoleEmfExporter +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricExportResult, MetricsData + + +class TestConsoleEmfExporter(unittest.TestCase): + """Test ConsoleEmfExporter class.""" + + def setUp(self): + """Set up test fixtures.""" + self.exporter = ConsoleEmfExporter() + + def test_namespace_initialization(self): + """Test exporter initialization with different namespace scenarios.""" + # Test default namespace + exporter = ConsoleEmfExporter() + self.assertEqual(exporter.namespace, "default") + + # Test custom namespace + exporter = ConsoleEmfExporter(namespace="CustomNamespace") + self.assertEqual(exporter.namespace, "CustomNamespace") + + # Test None namespace (should default to 'default') + exporter = ConsoleEmfExporter(namespace=None) + self.assertEqual(exporter.namespace, "default") + + # Test empty string namespace (should remain empty) + exporter = ConsoleEmfExporter(namespace="") + self.assertEqual(exporter.namespace, "") + + def test_initialization_with_parameters(self): + """Test exporter initialization with optional parameters.""" + # Test with preferred_temporality + preferred_temporality = {Counter: AggregationTemporality.CUMULATIVE} + exporter = ConsoleEmfExporter(namespace="TestNamespace", preferred_temporality=preferred_temporality) + self.assertEqual(exporter.namespace, "TestNamespace") + self.assertEqual(exporter._preferred_temporality[Counter], AggregationTemporality.CUMULATIVE) + + # Test with preferred_aggregation + preferred_aggregation = {Counter: "TestAggregation"} + exporter = ConsoleEmfExporter(preferred_aggregation=preferred_aggregation) + self.assertEqual(exporter._preferred_aggregation[Counter], "TestAggregation") + + # Test with additional kwargs + exporter = ConsoleEmfExporter(namespace="TestNamespace", extra_param="ignored") # Should be ignored + self.assertEqual(exporter.namespace, "TestNamespace") + + def test_export_log_event_success(self): + """Test that log events are properly sent to console output.""" + # Create a simple log event with EMF-formatted message + test_message = ( + '{"_aws":{"Timestamp":1640995200000,"CloudWatchMetrics":[{"Namespace":"TestNamespace",' + '"Dimensions":[["Service"]],"Metrics":[{"Name":"TestMetric","Unit":"Count"}]}]},' + '"Service":"test-service","TestMetric":42}' + ) + log_event = {"message": test_message, "timestamp": 1640995200000} + + # Capture stdout to verify the output + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + self.exporter._export(log_event) + + # Verify the message was printed to stdout with flush + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, test_message) + + def test_export_log_event_empty_message(self): + """Test handling of log events with empty messages.""" + log_event = {"message": "", "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should not print anything for empty message + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, "") + + # Should log a warning + mock_logger.warning.assert_called_once() + + def test_export_log_event_missing_message(self): + """Test handling of log events without message key.""" + log_event = {"timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should not print anything when message is missing + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, "") + + # Should log a warning + mock_logger.warning.assert_called_once() + + def test_export_log_event_with_none_message(self): + """Test handling of log events with None message.""" + log_event = {"message": None, "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should not print anything when message is None + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, "") + + # Should log a warning + mock_logger.warning.assert_called_once() + + def test_export_log_event_print_exception(self): + """Test error handling when print() raises an exception.""" + log_event = {"message": "test message", "timestamp": 1640995200000} + + with patch("builtins.print", side_effect=Exception("Print failed")): + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should log the error + mock_logger.error.assert_called_once() + args = mock_logger.error.call_args[0] + self.assertIn("Failed to write EMF log to console", args[0]) + self.assertEqual(args[1], log_event) + self.assertIn("Print failed", str(args[2])) + + def test_export_log_event_various_message_types(self): + """Test _export with various message types.""" + # Test with JSON string + json_message = '{"key": "value"}' + log_event = {"message": json_message, "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + self.exporter._export(log_event) + + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, json_message) + + # Test with plain string + plain_message = "Simple log message" + log_event = {"message": plain_message, "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + self.exporter._export(log_event) + + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, plain_message) + + def test_force_flush(self): + """Test force_flush method.""" + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + # Test with default timeout + result = self.exporter.force_flush() + self.assertTrue(result) + mock_logger.debug.assert_called_once() + + # Reset mock for next call + mock_logger.reset_mock() + + # Test with custom timeout + result = self.exporter.force_flush(timeout_millis=5000) + self.assertTrue(result) + mock_logger.debug.assert_called_once() + + def test_shutdown(self): + """Test shutdown method.""" + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + # Test with no timeout + result = self.exporter.shutdown() + self.assertTrue(result) + mock_logger.debug.assert_called_once_with("ConsoleEmfExporter shutdown called with timeout_millis=%s", None) + + # Reset mock for next call + mock_logger.reset_mock() + + # Test with timeout + result = self.exporter.shutdown(timeout_millis=3000) + self.assertTrue(result) + mock_logger.debug.assert_called_once_with("ConsoleEmfExporter shutdown called with timeout_millis=%s", 3000) + + # Reset mock for next call + mock_logger.reset_mock() + + # Test with additional kwargs + result = self.exporter.shutdown(timeout_millis=3000, extra_arg="ignored") + self.assertTrue(result) + mock_logger.debug.assert_called_once() + + def test_integration_with_metrics_data(self): + """Test the full export flow with actual MetricsData.""" + # Create a mock MetricsData + mock_metrics_data = MagicMock(spec=MetricsData) + mock_resource_metrics = MagicMock() + mock_scope_metrics = MagicMock() + mock_metric = MagicMock() + + # Set up the mock hierarchy + mock_metrics_data.resource_metrics = [mock_resource_metrics] + mock_resource_metrics.scope_metrics = [mock_scope_metrics] + mock_scope_metrics.metrics = [mock_metric] + + # Mock the metric to have no data_points to avoid complex setup + mock_metric.data = None + + with patch("sys.stdout", new_callable=StringIO): + result = self.exporter.export(mock_metrics_data) + + # Should succeed even with no actual metrics + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_integration_export_success(self): + """Test export method returns success.""" + # Create empty MetricsData + mock_metrics_data = MagicMock(spec=MetricsData) + mock_metrics_data.resource_metrics = [] + + result = self.exporter.export(mock_metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_integration_export_with_timeout(self): + """Test export method with timeout parameter.""" + mock_metrics_data = MagicMock(spec=MetricsData) + mock_metrics_data.resource_metrics = [] + + result = self.exporter.export(mock_metrics_data, timeout_millis=5000) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_failure_handling(self): + """Test export method handles exceptions and returns failure.""" + # Create a mock that raises an exception + mock_metrics_data = MagicMock(spec=MetricsData) + mock_metrics_data.resource_metrics = [MagicMock()] + + # Make the resource_metrics access raise an exception + type(mock_metrics_data).resource_metrics = property( + lambda self: (_ for _ in ()).throw(Exception("Test exception")) + ) + + # Patch the logger in the base_emf_exporter since that's where the error logging happens + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter.logger") as mock_logger: + result = self.exporter.export(mock_metrics_data) + + self.assertEqual(result, MetricExportResult.FAILURE) + mock_logger.error.assert_called_once() + self.assertIn("Failed to export metrics", mock_logger.error.call_args[0][0]) + + def test_flush_output_verification(self): + """Test that print is called with flush=True.""" + log_event = {"message": "test message", "timestamp": 1640995200000} + + with patch("builtins.print") as mock_print: + self.exporter._export(log_event) + + # Verify print was called with flush=True + mock_print.assert_called_once_with("test message", flush=True) + + def test_logger_levels(self): + """Test that appropriate log levels are used.""" + # Test debug logging in force_flush + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter.force_flush() + mock_logger.debug.assert_called_once() + + # Test debug logging in shutdown + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter.shutdown() + mock_logger.debug.assert_called_once() + + # Test warning logging for empty message + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export({"message": "", "timestamp": 123}) + mock_logger.warning.assert_called_once() + + # Test error logging for exception + with patch("builtins.print", side_effect=Exception("Test")): + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export({"message": "test", "timestamp": 123}) + mock_logger.error.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 9e4afc81e..31439eba3 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -25,6 +25,7 @@ AwsOpenTelemetryConfigurator, OtlpLogHeaderSetting, _check_emf_exporter_enabled, + _clear_logs_header_cache, _create_aws_otlp_exporter, _create_emf_exporter, _custom_import_sampler, @@ -37,16 +38,15 @@ _customize_span_processors, _export_unsampled_span_for_agent_observability, _export_unsampled_span_for_lambda, + _fetch_logs_header, _init_logging, _is_application_signals_enabled, _is_application_signals_runtime_enabled, _is_defer_to_workers_enabled, _is_wsgi_master_process, - _validate_and_fetch_logs_header, ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor -from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession # pylint: disable=line-too-long @@ -508,6 +508,7 @@ def test_customize_span_exporter_sigv4(self): bad_configs.append(config) for config in good_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_span_exporter, @@ -519,6 +520,7 @@ def test_customize_span_exporter_sigv4(self): ) for config in bad_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_span_exporter, @@ -616,6 +618,7 @@ def test_customize_logs_exporter_sigv4(self): bad_configs.append(config) for config in good_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_logs_exporter, @@ -626,6 +629,7 @@ def test_customize_logs_exporter_sigv4(self): ) for config in bad_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_logs_exporter, OTLPLogExporter(), OTLPLogExporter, Session, Compression.NoCompression ) @@ -964,64 +968,72 @@ def test_check_emf_exporter_enabled(self): # Clean up os.environ.pop("OTEL_METRICS_EXPORTER", None) - def test_validate_and_fetch_logs_header(self): + def test_fetch_logs_header(self): + _clear_logs_header_cache() + # Test when headers are not set os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertIsInstance(result, OtlpLogHeaderSetting) self.assertIsNone(result.log_group) self.assertIsNone(result.log_stream) self.assertIsNone(result.namespace) - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) + + # Test singleton behavior - should return the same cached instance + result2 = _fetch_logs_header() + self.assertIs(result, result2) # Same object reference - # Test with valid headers + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=test-stream" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertEqual(result.log_stream, "test-stream") self.assertIsNone(result.namespace) - self.assertTrue(result.is_valid) + self.assertTrue(result.is_valid()) - # Test with valid headers including namespace + # Test singleton behavior again + result2 = _fetch_logs_header() + self.assertIs(result, result2) + + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = ( "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace" ) - result = _validate_and_fetch_logs_header() - self.assertEqual(result.log_group, "test-group") - self.assertEqual(result.log_stream, "test-stream") + result = _fetch_logs_header() self.assertEqual(result.namespace, "test-namespace") - self.assertTrue(result.is_valid) + self.assertTrue(result.is_valid()) - # Test with missing log group + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-stream=test-stream" - result = _validate_and_fetch_logs_header() - self.assertIsNone(result.log_group) + result = _fetch_logs_header() self.assertEqual(result.log_stream, "test-stream") - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) - # Test with missing log stream + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertIsNone(result.log_stream) - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) - # Test with empty value in log group + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=,x-aws-log-stream=test-stream" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertIsNone(result.log_group) self.assertEqual(result.log_stream, "test-stream") - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) - # Test with empty value in log stream + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertIsNone(result.log_stream) - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) # Clean up os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) + _clear_logs_header_cache() @patch( "amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled", return_value=False @@ -1051,63 +1063,6 @@ def test_customize_log_record_processor_with_agent_observability(self, _): added_processor = mock_logger_provider.add_log_record_processor.call_args[0][0] self.assertIsInstance(added_processor, AwsCloudWatchOtlpBatchLogRecordProcessor) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter(self, mock_get_session, mock_validate): - # Test when botocore is not installed - mock_get_session.return_value = None - result = _create_emf_exporter() - self.assertIsNone(result) - - # Reset mock for subsequent tests - mock_get_session.reset_mock() - mock_get_session.return_value = MagicMock() - - # Mock the EMF exporter class import by patching the module import - with patch( - "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" - ) as mock_emf_exporter_class: - mock_exporter_instance = MagicMock() - mock_exporter_instance.namespace = "default" - mock_exporter_instance.log_group_name = "test-group" - mock_emf_exporter_class.return_value = mock_exporter_instance - - # Test when headers are invalid - mock_validate.return_value = OtlpLogHeaderSetting(None, None, None, False) - result = _create_emf_exporter() - self.assertIsNone(result) - - # Test when namespace is missing (should still create exporter with default namespace) - mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", None, True) - result = _create_emf_exporter() - self.assertIsNotNone(result) - self.assertEqual(result, mock_exporter_instance) - # Verify that the EMF exporter was called with correct parameters - mock_emf_exporter_class.assert_called_with( - session=mock_get_session.return_value, - namespace=None, - log_group_name="test-group", - log_stream_name="test-stream", - ) - - # Test with valid configuration - mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", "test-namespace", True) - result = _create_emf_exporter() - self.assertIsNotNone(result) - self.assertEqual(result, mock_exporter_instance) - # Verify that the EMF exporter was called with correct parameters - mock_emf_exporter_class.assert_called_with( - session=mock_get_session.return_value, - namespace="test-namespace", - log_group_name="test-group", - log_stream_name="test-stream", - ) - - # Test exception handling - mock_validate.side_effect = Exception("Test exception") - result = _create_emf_exporter() - self.assertIsNone(result) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") @@ -1174,35 +1129,6 @@ def test_create_aws_otlp_exporter(self, mock_get_session, mock_is_agent_enabled, result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") self.assertIsNone(result) - def test_customize_metric_exporters_with_emf(self): - metric_readers = [] - views = [] - - # Test with EMF disabled - _customize_metric_exporters(metric_readers, views, is_emf_enabled=False) - self.assertEqual(len(metric_readers), 0) - - # Test with EMF enabled but create_emf_exporter returns None - with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", return_value=None - ): - _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) - self.assertEqual(len(metric_readers), 0) - - # Test with EMF enabled and valid exporter - mock_emf_exporter = MagicMock(spec=AwsCloudWatchEmfExporter) - # Add the required attributes that PeriodicExportingMetricReader expects - mock_emf_exporter._preferred_temporality = {} - mock_emf_exporter._preferred_aggregation = {} - - with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", - return_value=mock_emf_exporter, - ): - _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) - self.assertEqual(len(metric_readers), 1) - self.assertIsInstance(metric_readers[0], PeriodicExportingMetricReader) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_service_attribute") def test_customize_resource_without_agent_observability(self, mock_get_service_attribute, mock_is_agent_enabled): @@ -1250,6 +1176,202 @@ def test_customize_resource_with_existing_agent_type(self, mock_get_service_attr self.assertEqual(result.attributes[AWS_LOCAL_SERVICE], "test-service") self.assertEqual(result.attributes[AWS_SERVICE_TYPE], "existing-agent") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_lambda_without_valid_headers( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns ConsoleEmfExporter for Lambda without valid log headers""" + # Setup mocks + mock_is_lambda.return_value = True + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = False + mock_header_setting.namespace = "test-namespace" + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.ConsoleEmfExporter" + ) as mock_console_exporter: + mock_exporter_instance = MagicMock() + mock_console_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_console_exporter.assert_called_once_with(namespace="test-namespace") + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_lambda_with_valid_headers(self, mock_get_session, mock_is_lambda, mock_fetch_headers): + """Test _create_emf_exporter returns AwsCloudWatchEmfExporter for Lambda with valid headers""" + # Setup mocks + mock_is_lambda.return_value = True + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_header_setting.namespace = "test-namespace" + mock_header_setting.log_group = "test-group" + mock_header_setting.log_stream = "test-stream" + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" + ) as mock_cloudwatch_exporter: + mock_exporter_instance = MagicMock() + mock_cloudwatch_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_cloudwatch_exporter.assert_called_once_with( + session=mock_session, + namespace="test-namespace", + log_group_name="test-group", + log_stream_name="test-stream", + ) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_non_lambda_with_valid_headers( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns AwsCloudWatchEmfExporter for non-Lambda with valid headers""" + # Setup mocks + mock_is_lambda.return_value = False + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_header_setting.namespace = "test-namespace" + mock_header_setting.log_group = "test-group" + mock_header_setting.log_stream = "test-stream" + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" + ) as mock_cloudwatch_exporter: + mock_exporter_instance = MagicMock() + mock_cloudwatch_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_cloudwatch_exporter.assert_called_once_with( + session=mock_session, + namespace="test-namespace", + log_group_name="test-group", + log_stream_name="test-stream", + ) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_non_lambda_without_valid_headers( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns None for non-Lambda without valid headers""" + # Setup mocks + mock_is_lambda.return_value = False + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = False + mock_fetch_headers.return_value = mock_header_setting + + result = _create_emf_exporter() + + self.assertIsNone(result) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") + def test_create_emf_exporter_no_botocore_session( + self, mock_logger, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns None when botocore session is not available""" + # Setup mocks + mock_is_lambda.return_value = False + mock_get_session.return_value = None # Simulate missing botocore + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_fetch_headers.return_value = mock_header_setting + + result = _create_emf_exporter() + + self.assertIsNone(result) + mock_logger.warning.assert_called_once_with("botocore is not installed. EMF exporter requires botocore") + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") + def test_create_emf_exporter_exception_handling(self, mock_logger, mock_fetch_headers): + """Test _create_emf_exporter handles exceptions gracefully""" + # Setup mocks to raise exception + test_exception = Exception("Test exception") + mock_fetch_headers.side_effect = test_exception + + result = _create_emf_exporter() + + self.assertIsNone(result) + mock_logger.error.assert_called_once_with("Failed to create EMF exporter: %s", test_exception) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_lambda_without_valid_headers_none_namespace( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter with Lambda environment and None namespace""" + # Setup mocks + mock_is_lambda.return_value = True + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = False + mock_header_setting.namespace = None + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.ConsoleEmfExporter" + ) as mock_console_exporter: + mock_exporter_instance = MagicMock() + mock_console_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_console_exporter.assert_called_once_with(namespace=None) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_cloudwatch_exporter_import_error( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter handles import errors for CloudWatch exporter""" + # Setup mocks + mock_is_lambda.return_value = False + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_fetch_headers.return_value = mock_header_setting + + # Mock import to raise ImportError + with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger: + with patch("builtins.__import__", side_effect=ImportError("Cannot import CloudWatch exporter")): + result = _create_emf_exporter() + + self.assertIsNone(result) + mock_logger.error.assert_called_once() + def validate_distro_environ(): tc: TestCase = TestCase() diff --git a/lambda-layer/src/otel-instrument b/lambda-layer/src/otel-instrument index 6965ba12d..0fb750f34 100755 --- a/lambda-layer/src/otel-instrument +++ b/lambda-layer/src/otel-instrument @@ -98,6 +98,19 @@ if [ -z ${OTEL_PROPAGATORS} ]; then export OTEL_PROPAGATORS="baggage,xray,tracecontext" fi +# disable application signals runtime metrics by default +export OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED="false"; + +# enable emf exporter by default +if [ -z "${OTEL_METRICS_EXPORTER}" ]; then + export OTEL_METRICS_EXPORTER="awsemf"; +fi + +# disable OTel logs exporter by default +if [ -z "${OTEL_LOGS_EXPORTER}" ]; then + export OTEL_LOGS_EXPORTER="none"; +fi + if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true"; fi @@ -107,17 +120,6 @@ fi if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "true" ]; then export OTEL_PYTHON_DISTRO="aws_distro"; export OTEL_PYTHON_CONFIGURATOR="aws_configurator"; - if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="none"; - fi -fi - -# - If Application Signals is disabled - -if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "false" ]; then - if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="none"; - fi fi if [ -z "${OTEL_RESOURCE_ATTRIBUTES}" ]; then From 3365607a333987a4ea803c8292e3b14128ec2519 Mon Sep 17 00:00:00 2001 From: Jonathan Lee <107072447+jj22ee@users.noreply.github.com> Date: Wed, 30 Jul 2025 17:13:48 -0700 Subject: [PATCH 3/7] Add Compact Console Log Exporter for Lambda Environment (#442) *Issue #, if available:* *Description of changes:* *Testing:* Lambda code: ``` import json import logging from logging import Logger, getLogger _logger: Logger = getLogger("__name__") def lambda_handler(event, context): _logger.error("helloooooo1") _logger.error("helloooooo2") _logger.error("helloooooo3") _logger.error("helloooooo4") _logger.error("helloooooo5") return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') } ``` Env vars: ``` AWS_LAMBDA_EXEC_WRAPPER=/opt/otel-instrument OTEL_LOGS_EXPORTER=otlp OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true ``` See that one log is in one CWLog line. image By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- .../distro/aws_opentelemetry_configurator.py | 6 +- .../logs/compact_console_log_exporter.py | 16 +++++ .../logs/test_compact_console_log_exporter.py | 70 +++++++++++++++++++ .../test_aws_opentelementry_configurator.py | 55 ++++++++++++++- 4 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 8fdc5306e..3f16e5dca 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -23,6 +23,7 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder +from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader @@ -46,7 +47,7 @@ ) from opentelemetry.sdk._events import EventLoggerProvider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter, LogExporter from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, @@ -216,6 +217,9 @@ def _init_logging( set_logger_provider(provider) for _, exporter_class in exporters.items(): + if exporter_class is ConsoleLogExporter and _is_lambda_environment(): + exporter_class = CompactConsoleLogExporter + _logger.debug("Lambda environment detected, using CompactConsoleLogExporter instead of ConsoleLogExporter") exporter_args = {} _customize_log_record_processor( logger_provider=provider, log_exporter=_customize_logs_exporter(exporter_class(**exporter_args)) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py new file mode 100644 index 000000000..18b7e26de --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py @@ -0,0 +1,16 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import re +from typing import Sequence + +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import ConsoleLogExporter, LogExportResult + + +class CompactConsoleLogExporter(ConsoleLogExporter): + def export(self, batch: Sequence[LogData]): + for data in batch: + formatted_json = self.formatter(data.log_record) + print(re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json), flush=True) + + return LogExportResult.SUCCESS diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py new file mode 100644 index 000000000..5000102e8 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py @@ -0,0 +1,70 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import unittest +from unittest.mock import Mock, patch + +from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter +from opentelemetry.sdk._logs.export import LogExportResult + + +class TestCompactConsoleLogExporter(unittest.TestCase): + + def setUp(self): + self.exporter = CompactConsoleLogExporter() + + @patch("builtins.print") + def test_export_compresses_json(self, mock_print): + # Mock log data + mock_log_data = Mock() + mock_log_record = Mock() + mock_log_data.log_record = mock_log_record + + # Mock formatted JSON with whitespace + formatted_json = '{\n "body": "test message",\n "severity_number": 9,\n "attributes": {\n "key": "value"\n }\n}' # noqa: E501 + self.exporter.formatter = Mock(return_value=formatted_json) + + # Call export + result = self.exporter.export([mock_log_data]) + + # Verify result + self.assertEqual(result, LogExportResult.SUCCESS) + + # Verify print calls + self.assertEqual(mock_print.call_count, 1) + mock_print.assert_called_with( + '{"body":"test message","severity_number":9,"attributes":{"key":"value"}}', flush=True + ) + + @patch("builtins.print") + def test_export_multiple_records(self, mock_print): + # Mock multiple log data + mock_log_data1 = Mock() + mock_log_data2 = Mock() + mock_log_data1.log_record = Mock() + mock_log_data2.log_record = Mock() + + formatted_json = '{\n "body": "test"\n}' + self.exporter.formatter = Mock(return_value=formatted_json) + + # Call export + result = self.exporter.export([mock_log_data1, mock_log_data2]) + + # Verify result + self.assertEqual(result, LogExportResult.SUCCESS) + + # Verify print calls + self.assertEqual(mock_print.call_count, 2) # 2 records + # Each record should print compact JSON + expected_calls = [unittest.mock.call('{"body":"test"}', flush=True)] * 2 + mock_print.assert_has_calls(expected_calls) + + @patch("builtins.print") + def test_export_empty_batch(self, mock_print): + # Call export with empty batch + result = self.exporter.export([]) + + # Verify result + self.assertEqual(result, LogExportResult.SUCCESS) + + # Verify print calls + mock_print.assert_not_called() # No records, no prints diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 31439eba3..efcc5a317 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -47,6 +47,7 @@ ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor +from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import CompactConsoleLogExporter from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession # pylint: disable=line-too-long @@ -70,7 +71,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.metrics import get_meter_provider from opentelemetry.processor.baggage import BaggageSpanProcessor -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -680,6 +681,58 @@ def capture_exporter(*args, **kwargs): os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.LoggingHandler", return_value=MagicMock()) + @patch("logging.getLogger", return_value=MagicMock()) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._customize_logs_exporter") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.LoggerProvider", return_value=MagicMock()) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._customize_log_record_processor") + def test_init_logging_console_exporter_replacement( + self, + mock_customize_processor, + mock_logger_provider, + mock_customize_logs_exporter, + mock_get_logger, + mock_logging_handler, + ): + """Test that ConsoleLogExporter is replaced with CompactConsoleLogExporter when in Lambda""" + + # Mock _is_lambda_environment to return True + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment", return_value=True + ): + # Test with ConsoleLogExporter + exporters = {"console": ConsoleLogExporter} + _init_logging(exporters, Resource.get_empty()) + + # Verify that _customize_log_record_processor was called + mock_customize_processor.assert_called_once() + + # Get the exporter that was passed to _customize_logs_exporter + call_args = mock_customize_logs_exporter.call_args + exporter_instance = call_args[0][0] + + # Verify it's a CompactConsoleLogExporter instance + self.assertIsInstance(exporter_instance, CompactConsoleLogExporter) + + # Reset mocks + mock_customize_processor.reset_mock() + mock_customize_logs_exporter.reset_mock() + + # Test when not in Lambda environment - should not replace + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment", return_value=False + ): + exporters = {"console": ConsoleLogExporter} + _init_logging(exporters, Resource.get_empty()) + + # Get the exporter that was passed to _customize_logs_exporter + call_args = mock_customize_logs_exporter.call_args + exporter_instance = call_args[0][0] + + # Verify it's still a regular ConsoleLogExporter + self.assertIsInstance(exporter_instance, ConsoleLogExporter) + self.assertNotIsInstance(exporter_instance, CompactConsoleLogExporter) + def test_customize_span_processors(self): mock_tracer_provider: TracerProvider = MagicMock() os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) From 89f2b5eff0abf251451704f2b68dfe87e6ed77f6 Mon Sep 17 00:00:00 2001 From: Jonathan Lee <107072447+jj22ee@users.noreply.github.com> Date: Mon, 4 Aug 2025 13:17:38 -0700 Subject: [PATCH 4/7] Set OTEL_METRICS_EXPORTER=none as default in Lambda (#446) *Issue #, if available:* *Description of changes:* By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- lambda-layer/src/otel-instrument | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambda-layer/src/otel-instrument b/lambda-layer/src/otel-instrument index 0fb750f34..65a9edc2e 100755 --- a/lambda-layer/src/otel-instrument +++ b/lambda-layer/src/otel-instrument @@ -101,9 +101,9 @@ fi # disable application signals runtime metrics by default export OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED="false"; -# enable emf exporter by default +# disable otel metrics export by default if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="awsemf"; + export OTEL_METRICS_EXPORTER="none"; fi # disable OTel logs exporter by default From ead30f4cb3138c052da5a0f1795279866e6ec452 Mon Sep 17 00:00:00 2001 From: Steve Liu Date: Mon, 11 Aug 2025 13:08:38 -0700 Subject: [PATCH 5/7] Disable warning messages from OTel auto instrumentation (#449) *Description of changes:* When setting `OTEL_PYTHON_CONFIGURATOR=aws_configurator`, users see a verbose warning message: `Configuration of configurator not loaded because aws_configurator is set by OTEL_PYTHON_CONFIGURATOR` These warnings appear when multiple OpenTelemetry configurators are installed (e.g., default upstream configurators alongside the AWS configurator). While the behavior is correct, the warnings create noise in application logs. Disabling this log message from showing up by default. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- .../amazon/opentelemetry/distro/aws_opentelemetry_distro.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index 35a41780d..0f45a1322 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -3,7 +3,7 @@ import importlib import os import sys -from logging import Logger, getLogger +from logging import ERROR, Logger, getLogger from amazon.opentelemetry.distro._utils import get_aws_region, is_agent_observability_enabled from amazon.opentelemetry.distro.aws_opentelemetry_configurator import ( @@ -22,12 +22,16 @@ from opentelemetry import propagate from opentelemetry.distro import OpenTelemetryDistro from opentelemetry.environment_variables import OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR +from opentelemetry.instrumentation.auto_instrumentation import _load +from opentelemetry.instrumentation.logging.environment_variables import OTEL_PYTHON_LOG_LEVEL from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, OTEL_EXPORTER_OTLP_PROTOCOL, ) _logger: Logger = getLogger(__name__) +# Suppress configurator warnings from OpenTelemetry auto-instrumentation +_load._logger.setLevel(os.environ.get(OTEL_PYTHON_LOG_LEVEL, ERROR)) class AwsOpenTelemetryDistro(OpenTelemetryDistro): From ba7fcbe814b60ea6d92d85ab7a3d61664b41d9f6 Mon Sep 17 00:00:00 2001 From: Steve Liu Date: Tue, 12 Aug 2025 09:18:49 -0700 Subject: [PATCH 6/7] Change _load logging to lowercase (#450) *Description of changes:* `OTEL_PYTHON_LOG_LEVEL` uses lowercase for log levels. Changing it to fix the uppercase only from this PR: https://github.com/aws-observability/aws-otel-python-instrumentation/pull/449 **Testing this on Lambda (with default Lambda config setup)**: image ``` START RequestId: 0157a3c0-c584-49bc-b5ba-fd2008335b83 Version: $LATEST END RequestId: 0157a3c0-c584-49bc-b5ba-fd2008335b83 REPORT RequestId: 0157a3c0-c584-49bc-b5ba-fd2008335b83 Duration: 230.96 ms Billed Duration: 231 ms Memory Size: 512 MB Max Memory Used: 113 MB Init Duration: 1416.80 ms XRAY TraceId: 1-689b663c-01c68093613ed19523fb9c6f SegmentId: 76d15c82545c2530 Sampled: true ``` **Testing this on Lambda (with modified Lambda config setup)**: image ``` Configuration of configurator not loaded, aws_configurator already loaded START RequestId: 16d4902c-527b-4edd-86ef-abd423da365d Version: $LATEST END RequestId: 16d4902c-527b-4edd-86ef-abd423da365d REPORT RequestId: 16d4902c-527b-4edd-86ef-abd423da365d Duration: 166.12 ms Billed Duration: 167 ms Memory Size: 512 MB Max Memory Used: 113 MB Init Duration: 1467.68 ms XRAY TraceId: 1-689b6692-1ea1293937b97276187b0a71 SegmentId: d2ed420726a62b67 Sampled: true ``` See: https://opentelemetry.io/docs/zero-code/python/configuration/#logging For valid logging values. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- .../amazon/opentelemetry/distro/aws_opentelemetry_distro.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index 0f45a1322..487272024 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -23,6 +23,7 @@ from opentelemetry.distro import OpenTelemetryDistro from opentelemetry.environment_variables import OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR from opentelemetry.instrumentation.auto_instrumentation import _load +from opentelemetry.instrumentation.logging import LEVELS from opentelemetry.instrumentation.logging.environment_variables import OTEL_PYTHON_LOG_LEVEL from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, @@ -30,8 +31,8 @@ ) _logger: Logger = getLogger(__name__) -# Suppress configurator warnings from OpenTelemetry auto-instrumentation -_load._logger.setLevel(os.environ.get(OTEL_PYTHON_LOG_LEVEL, ERROR)) +# Suppress configurator warnings from auto-instrumentation +_load._logger.setLevel(LEVELS.get(os.environ.get(OTEL_PYTHON_LOG_LEVEL, "error").lower(), ERROR)) class AwsOpenTelemetryDistro(OpenTelemetryDistro): From f9a51ebf686761382d1b4a1711e621efb0c1bf18 Mon Sep 17 00:00:00 2001 From: Lei Wang <66336933+wangzlei@users.noreply.github.com> Date: Tue, 12 Aug 2025 22:08:27 -0700 Subject: [PATCH 7/7] add LoggerProvider force flush in Lambda function (#441) *Issue #, if available:* The current Lambda instrumentation is missing a call to the loggerProvider.force_flush method, which can result in delayed or missing OTel logs in the Lambda environment due to Lambda freeze *Description of changes:* Adding LoggerProvider force flush in Lambda instrumentation. Logging a debug log if the loggerProvider does not support force flush because the default global LoggerProvider is ProxyLoggerProvider in OpenTelemetry Python, will be replaced to SDKLoggerProvider only if user set environment variable `OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED = true`, which is not set in ADOT Lambda layer. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. Co-authored-by: Jonathan Lee <107072447+jj22ee@users.noreply.github.com> --- .../instrumentation/aws_lambda/__init__.py | 69 +++++++++---------- 1 file changed, 33 insertions(+), 36 deletions(-) diff --git a/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py index 246652956..23f005f6a 100644 --- a/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -46,6 +46,7 @@ def lambda_handler(event, context): tracer_provider (TracerProvider) - an optional tracer provider meter_provider (MeterProvider) - an optional meter provider +logger_provider (LoggerProvider) - an optional logger provider event_context_extractor (Callable) - a function that returns an OTel Trace Context given the Lambda Event the AWS Lambda was invoked with this function signature is: def event_context_extractor(lambda_event: Any) -> Context @@ -77,6 +78,7 @@ def custom_event_context_extractor(lambda_event): from wrapt import wrap_function_wrapper from opentelemetry import context as context_api +from opentelemetry._logs import LoggerProvider, get_logger_provider from opentelemetry.context.context import Context from opentelemetry.instrumentation.aws_lambda.package import _instruments from opentelemetry.instrumentation.aws_lambda.version import __version__ @@ -94,9 +96,7 @@ def custom_event_context_extractor(lambda_event): _HANDLER = "_HANDLER" _X_AMZN_TRACE_ID = "_X_AMZN_TRACE_ID" ORIG_HANDLER = "ORIG_HANDLER" -OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = ( - "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT" -) +OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT" def _default_event_context_extractor(lambda_event: Any) -> Context: @@ -157,17 +157,13 @@ def _determine_parent_context( return event_context_extractor(lambda_event) -def _set_api_gateway_v1_proxy_attributes( - lambda_event: Any, span: Span -) -> Span: +def _set_api_gateway_v1_proxy_attributes(lambda_event: Any, span: Span) -> Span: """Sets HTTP attributes for REST APIs and v1 HTTP APIs More info: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format """ - span.set_attribute( - SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod") - ) + span.set_attribute(SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod")) if lambda_event.get("headers"): if "User-Agent" in lambda_event["headers"]: @@ -194,16 +190,12 @@ def _set_api_gateway_v1_proxy_attributes( f"{lambda_event['resource']}?{urlencode(lambda_event['queryStringParameters'])}", ) else: - span.set_attribute( - SpanAttributes.HTTP_TARGET, lambda_event["resource"] - ) + span.set_attribute(SpanAttributes.HTTP_TARGET, lambda_event["resource"]) return span -def _set_api_gateway_v2_proxy_attributes( - lambda_event: Any, span: Span -) -> Span: +def _set_api_gateway_v2_proxy_attributes(lambda_event: Any, span: Span) -> Span: """Sets HTTP attributes for v2 HTTP APIs More info: @@ -253,6 +245,7 @@ def _instrument( event_context_extractor: Callable[[Any], Context], tracer_provider: TracerProvider = None, meter_provider: MeterProvider = None, + logger_provider: LoggerProvider = None, ): # pylint: disable=too-many-locals @@ -261,9 +254,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches call_wrapped, instance, args, kwargs ): - orig_handler_name = ".".join( - [wrapped_module_name, wrapped_function_name] - ) + orig_handler_name = ".".join([wrapped_module_name, wrapped_function_name]) lambda_event = args[0] @@ -306,9 +297,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches # # See more: # https://github.com/open-telemetry/semantic-conventions/blob/main/docs/faas/aws-lambda.md#all-triggers - account_id = lambda_context.invoked_function_arn.split( - ":" - )[4] + account_id = lambda_context.invoked_function_arn.split(":")[4] span.set_attribute( ResourceAttributes.CLOUD_ACCOUNT_ID, account_id, @@ -326,19 +315,13 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches # If the request came from an API Gateway, extract http attributes from the event # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#api-gateway # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-server-semantic-conventions - if isinstance(lambda_event, dict) and lambda_event.get( - "requestContext" - ): + if isinstance(lambda_event, dict) and lambda_event.get("requestContext"): span.set_attribute(SpanAttributes.FAAS_TRIGGER, "http") if lambda_event.get("version") == "2.0": - _set_api_gateway_v2_proxy_attributes( - lambda_event, span - ) + _set_api_gateway_v2_proxy_attributes(lambda_event, span) else: - _set_api_gateway_v1_proxy_attributes( - lambda_event, span - ) + _set_api_gateway_v1_proxy_attributes(lambda_event, span) if isinstance(result, dict) and result.get("statusCode"): span.set_attribute( @@ -377,6 +360,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches " case of a Lambda freeze and would exist in the OTel SDK implementation." ) + _logger_provider = logger_provider or get_logger_provider() + if hasattr(_logger_provider, "force_flush"): + rem = flush_timeout - (time.time() - now) * 1000 + if rem > 0: + try: + # NOTE: `force_flush` before function quit in case of Lambda freeze. + _logger_provider.force_flush(rem) + except Exception: # pylint: disable=broad-except + logger.exception("LoggerProvider failed to flush logs") + else: + logger.debug( + "LoggerProvider (%s) was missing `force_flush` method. This is necessary in" + " case of a Lambda freeze and would exist in the OTel SDK implementation.", + _logger_provider.__class__.__name__, + ) + if exception is not None: raise exception.with_traceback(exception.__traceback__) @@ -403,6 +402,7 @@ def _instrument(self, **kwargs): **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``meter_provider``: a MeterProvider, defaults to global + ``logger_provider``: a LoggerProvider, defaults to global ``event_context_extractor``: a method which takes the Lambda Event as input and extracts an OTel Context from it. By default, the context is extracted from the HTTP headers of an API Gateway @@ -423,9 +423,7 @@ def _instrument(self, **kwargs): self._wrapped_function_name, ) = lambda_handler.rsplit(".", 1) - flush_timeout_env = os.environ.get( - OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None - ) + flush_timeout_env = os.environ.get(OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None) flush_timeout = 30000 try: if flush_timeout_env is not None: @@ -440,11 +438,10 @@ def _instrument(self, **kwargs): self._wrapped_module_name, self._wrapped_function_name, flush_timeout, - event_context_extractor=kwargs.get( - "event_context_extractor", _default_event_context_extractor - ), + event_context_extractor=kwargs.get("event_context_extractor", _default_event_context_extractor), tracer_provider=kwargs.get("tracer_provider"), meter_provider=kwargs.get("meter_provider"), + logger_provider=kwargs.get("logger_provider"), ) def _uninstrument(self, **kwargs):