diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index b89b0bcb1..8fdc5306e 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -122,7 +122,14 @@ class OtlpLogHeaderSetting(NamedTuple): log_group: Optional[str] log_stream: Optional[str] namespace: Optional[str] - is_valid: bool + + def is_valid(self) -> bool: + """Check if the log header setting is valid by ensuring both log_group and log_stream are present.""" + return self.log_group is not None and self.log_stream is not None + + +# Singleton cache for OtlpLogHeaderSetting +_otlp_log_header_setting_cache: Optional[OtlpLogHeaderSetting] = None class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): @@ -440,7 +447,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter: if isinstance(log_exporter, OTLPLogExporter): - if _validate_and_fetch_logs_header().is_valid: + if _fetch_logs_header().is_valid(): endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(logs_endpoint) # Setting default compression mode to Gzip as this is the behavior in upstream's # collector otlp http exporter: @@ -627,18 +634,23 @@ def _extract_endpoint_and_region_from_otlp_endpoint(endpoint: str): return endpoint, region -def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: - """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to - AWS OTLP Logs endpoint.""" +def _fetch_logs_header() -> OtlpLogHeaderSetting: + """Returns the OTLP log header setting as a singleton instance.""" + global _otlp_log_header_setting_cache # pylint: disable=global-statement + + if _otlp_log_header_setting_cache is not None: + return _otlp_log_header_setting_cache logs_headers = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS) if not logs_headers: - _logger.warning( - "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " - "to include x-aws-log-group and x-aws-log-stream" - ) - return OtlpLogHeaderSetting(None, None, None, False) + if not _is_lambda_environment(): + _logger.warning( + "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS " + "to include x-aws-log-group and x-aws-log-stream" + ) + _otlp_log_header_setting_cache = OtlpLogHeaderSetting(None, None, None) + return _otlp_log_header_setting_cache log_group = None log_stream = None @@ -656,9 +668,14 @@ def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: elif key == AWS_EMF_METRICS_NAMESPACE and value: namespace = value - is_valid = log_group is not None and log_stream is not None + _otlp_log_header_setting_cache = OtlpLogHeaderSetting(log_group, log_stream, namespace) + return _otlp_log_header_setting_cache + - return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid) +def _clear_logs_header_cache(): + """Clear the singleton cache for OtlpLogHeaderSetting. Used primarily for testing.""" + global _otlp_log_header_setting_cache # pylint: disable=global-statement + _otlp_log_header_setting_cache = None def _get_metric_export_interval(): @@ -773,8 +790,25 @@ def _check_emf_exporter_enabled() -> bool: def _create_emf_exporter(): - """Create and configure the CloudWatch EMF exporter.""" + """ + Create the appropriate EMF exporter based on the environment and configuration. + + Returns: + ConsoleEmfExporter for Lambda without log headers log group and stream + AwsCloudWatchEmfExporter for other cases (when conditions are met) + None if CloudWatch exporter cannot be created + """ try: + log_header_setting = _fetch_logs_header() + + # Lambda without valid logs http headers - use Console EMF exporter + if _is_lambda_environment() and not log_header_setting.is_valid(): + # pylint: disable=import-outside-toplevel + from amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter import ConsoleEmfExporter + + return ConsoleEmfExporter(namespace=log_header_setting.namespace) + + # For non-Lambda environment or Lambda with valid headers - use CloudWatch EMF exporter session = get_aws_session() # Check if botocore is available before importing the EMF exporter if not session: @@ -786,9 +820,7 @@ def _create_emf_exporter(): AwsCloudWatchEmfExporter, ) - log_header_setting = _validate_and_fetch_logs_header() - - if not log_header_setting.is_valid: + if not log_header_setting.is_valid(): return None return AwsCloudWatchEmfExporter( diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py index 168ac7df1..6603391a1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/aws_cloudwatch_emf_exporter.py @@ -3,66 +3,18 @@ # pylint: disable=no-self-use -import json import logging -import math -import time -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Optional -from opentelemetry.sdk.metrics import Counter -from opentelemetry.sdk.metrics import Histogram as HistogramInstr -from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter -from opentelemetry.sdk.metrics._internal.point import Metric -from opentelemetry.sdk.metrics.export import ( - AggregationTemporality, - ExponentialHistogram, - Gauge, - Histogram, - MetricExporter, - MetricExportResult, - MetricsData, - NumberDataPoint, - Sum, -) -from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation -from opentelemetry.sdk.resources import Resource -from opentelemetry.util.types import Attributes +from opentelemetry.sdk.metrics.export import AggregationTemporality from ._cloudwatch_log_client import CloudWatchLogClient +from .base_emf_exporter import BaseEmfExporter logger = logging.getLogger(__name__) -class MetricRecord: - """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" - - def __init__(self, metric_name: str, metric_unit: str, metric_description: str): - """ - Initialize metric record. - - Args: - metric_name: Name of the metric - metric_unit: Unit of the metric - metric_description: Description of the metric - """ - # Instrument metadata - self.name = metric_name - self.unit = metric_unit - self.description = metric_description - - # Will be set by conversion methods - self.timestamp: Optional[int] = None - self.attributes: Attributes = {} - - # Different metric type data - only one will be set per record - self.value: Optional[float] = None - self.sum_data: Optional[Any] = None - self.histogram_data: Optional[Any] = None - self.exp_histogram_data: Optional[Any] = None - - -class AwsCloudWatchEmfExporter(MetricExporter): +class AwsCloudWatchEmfExporter(BaseEmfExporter): """ OpenTelemetry metrics exporter for CloudWatch EMF format. @@ -74,50 +26,6 @@ class AwsCloudWatchEmfExporter(MetricExporter): """ - # CloudWatch EMF supported units - # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html - EMF_SUPPORTED_UNITS = { - "Seconds", - "Microseconds", - "Milliseconds", - "Bytes", - "Kilobytes", - "Megabytes", - "Gigabytes", - "Terabytes", - "Bits", - "Kilobits", - "Megabits", - "Gigabits", - "Terabits", - "Percent", - "Count", - "Bytes/Second", - "Kilobytes/Second", - "Megabytes/Second", - "Gigabytes/Second", - "Terabytes/Second", - "Bits/Second", - "Kilobits/Second", - "Megabits/Second", - "Gigabits/Second", - "Terabits/Second", - "Count/Second", - "None", - } - - # OTel to CloudWatch unit mapping - # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 - UNIT_MAPPING = { - "1": "", - "ns": "", - "ms": "Milliseconds", - "s": "Seconds", - "us": "Microseconds", - "By": "Bytes", - "bit": "Bits", - } - def __init__( self, namespace: str = "default", @@ -140,26 +48,8 @@ def __init__( preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation **kwargs: Additional arguments passed to botocore client """ - # Set up temporality preference default to DELTA if customers not set - if preferred_temporality is None: - preferred_temporality = { - Counter: AggregationTemporality.DELTA, - HistogramInstr: AggregationTemporality.DELTA, - ObservableCounter: AggregationTemporality.DELTA, - ObservableGauge: AggregationTemporality.DELTA, - ObservableUpDownCounter: AggregationTemporality.DELTA, - UpDownCounter: AggregationTemporality.DELTA, - } - - # Set up aggregation preference default to exponential histogram for histogram metrics - if preferred_aggregation is None: - preferred_aggregation = { - HistogramInstr: ExponentialBucketHistogramAggregation(), - } - - super().__init__(preferred_temporality, preferred_aggregation) + super().__init__(namespace, preferred_temporality, preferred_aggregation) - self.namespace = namespace self.log_group_name = log_group_name # Initialize CloudWatch Logs client @@ -167,359 +57,7 @@ def __init__( log_group_name=log_group_name, log_stream_name=log_stream_name, aws_region=aws_region, **kwargs ) - def _get_metric_name(self, record: MetricRecord) -> Optional[str]: - """Get the metric name from the metric record or data point.""" - - try: - if record.name: - return record.name - except AttributeError: - pass - # Return None if no valid metric name found - return None - - def _get_unit(self, record: MetricRecord) -> Optional[str]: - """Get CloudWatch unit from MetricRecord unit.""" - unit = record.unit - - if not unit: - return None - - # First check if unit is already a supported EMF unit - if unit in self.EMF_SUPPORTED_UNITS: - return unit - - # Map from OTel unit to CloudWatch unit - mapped_unit = self.UNIT_MAPPING.get(unit) - - return mapped_unit - - def _get_dimension_names(self, attributes: Attributes) -> List[str]: - """Extract dimension names from attributes.""" - # Implement dimension selection logic - # For now, use all attributes as dimensions - return list(attributes.keys()) - - def _get_attributes_key(self, attributes: Attributes) -> str: - """ - Create a hashable key from attributes for grouping metrics. - - Args: - attributes: The attributes dictionary - - Returns: - A string representation of sorted attributes key-value pairs - """ - # Sort the attributes to ensure consistent keys - sorted_attrs = sorted(attributes.items()) - # Create a string representation of the attributes - return str(sorted_attrs) - - def _normalize_timestamp(self, timestamp_ns: int) -> int: - """ - Normalize a nanosecond timestamp to milliseconds for CloudWatch. - - Args: - timestamp_ns: Timestamp in nanoseconds - - Returns: - Timestamp in milliseconds - """ - # Convert from nanoseconds to milliseconds - return timestamp_ns // 1_000_000 - - def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: - """ - Creates the intermediate metric data structure that standardizes different otel metric representation - and will be used to generate EMF events. The base record - establishes the instrument schema (name/unit/description) that will be populated - with dimensions, timestamps, and values during metric processing. - - Args: - metric_name: Name of the metric - metric_unit: Unit of the metric - metric_description: Description of the metric - - Returns: - A MetricRecord object - """ - return MetricRecord(metric_name, metric_unit, metric_description) - - def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: - """Convert a Gauge or Sum metric datapoint to a metric record. - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and value - """ - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # Set the value directly for both Gauge and Sum - record.value = data_point.value - - return record - - def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: - """Convert a Histogram metric datapoint to a metric record. - - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and histogram_data - """ - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # For Histogram, set the histogram_data - record.histogram_data = { - "Count": data_point.count, - "Sum": data_point.sum, - "Min": data_point.min, - "Max": data_point.max, - } - return record - - # pylint: disable=too-many-locals - def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: - """ - Convert an ExponentialHistogram metric datapoint to a metric record. - - This function follows the logic of CalculateDeltaDatapoints in the Go implementation, - converting exponential buckets to their midpoint values. - - Ref: - https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 - - Args: - metric: The metric object - data_point: The datapoint to convert - - Returns: - MetricRecord with populated timestamp, attributes, and exp_histogram_data - """ - - # Create base record - record = self._create_metric_record(metric.name, metric.unit, metric.description) - - # Set timestamp - timestamp_ms = ( - self._normalize_timestamp(data_point.time_unix_nano) - if data_point.time_unix_nano is not None - else int(time.time() * 1000) - ) - record.timestamp = timestamp_ms - - # Set attributes - record.attributes = data_point.attributes - - # Initialize arrays for values and counts - array_values = [] - array_counts = [] - - # Get scale - scale = data_point.scale - # Calculate base using the formula: 2^(2^(-scale)) - base = math.pow(2, math.pow(2, float(-scale))) - - # Process positive buckets - if data_point.positive and data_point.positive.bucket_counts: - positive_offset = getattr(data_point.positive, "offset", 0) - positive_bucket_counts = data_point.positive.bucket_counts - - bucket_begin = 0 - bucket_end = 0 - - for bucket_index, count in enumerate(positive_bucket_counts): - index = bucket_index + positive_offset - - if bucket_begin == 0: - bucket_begin = math.pow(base, float(index)) - else: - bucket_begin = bucket_end - - bucket_end = math.pow(base, float(index + 1)) - - # Calculate midpoint value of the bucket - metric_val = (bucket_begin + bucket_end) / 2 - - # Only include buckets with positive counts - if count > 0: - array_values.append(metric_val) - array_counts.append(float(count)) - - # Process zero bucket - zero_count = getattr(data_point, "zero_count", 0) - if zero_count > 0: - array_values.append(0) - array_counts.append(float(zero_count)) - - # Process negative buckets - if data_point.negative and data_point.negative.bucket_counts: - negative_offset = getattr(data_point.negative, "offset", 0) - negative_bucket_counts = data_point.negative.bucket_counts - - bucket_begin = 0 - bucket_end = 0 - - for bucket_index, count in enumerate(negative_bucket_counts): - index = bucket_index + negative_offset - - if bucket_end == 0: - bucket_end = -math.pow(base, float(index)) - else: - bucket_end = bucket_begin - - bucket_begin = -math.pow(base, float(index + 1)) - - # Calculate midpoint value of the bucket - metric_val = (bucket_begin + bucket_end) / 2 - - # Only include buckets with positive counts - if count > 0: - array_values.append(metric_val) - array_counts.append(float(count)) - - # Set the histogram data in the format expected by CloudWatch EMF - record.exp_histogram_data = { - "Values": array_values, - "Counts": array_counts, - "Count": data_point.count, - "Sum": data_point.sum, - "Max": data_point.max, - "Min": data_point.min, - } - - return record - - def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: - """Group metric record by attributes and timestamp. - - Args: - record: The metric record - - Returns: - A tuple key for grouping - """ - # Create a key for grouping based on attributes - attrs_key = self._get_attributes_key(record.attributes) - return (attrs_key, record.timestamp) - - def _create_emf_log( - self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None - ) -> Dict: - """ - Create EMF log dictionary from metric records. - - Since metric_records is already grouped by attributes, this function - creates a single EMF log for all records. - """ - # Start with base structure - emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} - - # Set with latest EMF version schema - # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 - emf_log["Version"] = "1" - - # Add resource attributes to EMF log but not as dimensions - # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes - # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, - # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. - # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. - if resource and resource.attributes: - for key, value in resource.attributes.items(): - emf_log[f"otel.resource.{key}"] = str(value) - - # Initialize collections for dimensions and metrics - metric_definitions = [] - # Collect attributes from all records (they should be the same for all records in the group) - # Only collect once from the first record and apply to all records - all_attributes = ( - metric_records[0].attributes - if metric_records and len(metric_records) > 0 and metric_records[0].attributes - else {} - ) - - # Process each metric record - for record in metric_records: - - metric_name = self._get_metric_name(record) - - # Skip processing if metric name is None or empty - if not metric_name: - continue - - # Create metric data dict - metric_data = {"Name": metric_name} - - unit = self._get_unit(record) - if unit: - metric_data["Unit"] = unit - - # Process different types of aggregations - if record.exp_histogram_data: - # Base2 Exponential Histogram - emf_log[metric_name] = record.exp_histogram_data - elif record.histogram_data: - # Regular Histogram metrics - emf_log[metric_name] = record.histogram_data - elif record.value is not None: - # Gauge, Sum, and other aggregations - emf_log[metric_name] = record.value - else: - logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) - continue - - # Add to metric definitions list - metric_definitions.append(metric_data) - - # Get dimension names from collected attributes - dimension_names = self._get_dimension_names(all_attributes) - - # Add attribute values to the root of the EMF log - for name, value in all_attributes.items(): - emf_log[name] = str(value) - - # Add CloudWatch Metrics if we have metrics, include dimensions only if they exist - if metric_definitions: - cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions} - if dimension_names: - cloudwatch_metric["Dimensions"] = [dimension_names] - - emf_log["_aws"]["CloudWatchMetrics"].append(cloudwatch_metric) - - return emf_log - - def _send_log_event(self, log_event: Dict[str, Any]): + def _export(self, log_event: Dict[str, Any]): """ Send a log event to CloudWatch Logs using the log client. @@ -528,82 +66,6 @@ def _send_log_event(self, log_event: Dict[str, Any]): """ self.log_client.send_log_event(log_event) - # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches - def export( - self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any - ) -> MetricExportResult: - """ - Export metrics as EMF logs to CloudWatch. - - Groups metrics by attributes and timestamp before creating EMF logs. - - Args: - metrics_data: MetricsData containing resource metrics and scope metrics - timeout_millis: Optional timeout in milliseconds - **kwargs: Additional keyword arguments - - Returns: - MetricExportResult indicating success or failure - """ - try: - if not metrics_data.resource_metrics: - return MetricExportResult.SUCCESS - - # Process all metrics from all resource metrics and scope metrics - for resource_metrics in metrics_data.resource_metrics: - for scope_metrics in resource_metrics.scope_metrics: - # Dictionary to group metrics by attributes and timestamp - grouped_metrics = defaultdict(list) - - # Process all metrics in this scope - for metric in scope_metrics.metrics: - # Skip if metric.data is None or no data_points exists - try: - if not (metric.data and metric.data.data_points): - continue - except AttributeError: - # Metric doesn't have data or data_points attribute - continue - - # Process metrics based on type - metric_type = type(metric.data) - if metric_type in (Gauge, Sum): - for dp in metric.data.data_points: - record = self._convert_gauge_and_sum(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - elif metric_type == Histogram: - for dp in metric.data.data_points: - record = self._convert_histogram(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - elif metric_type == ExponentialHistogram: - for dp in metric.data.data_points: - record = self._convert_exp_histogram(metric, dp) - grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) - else: - logger.debug("Unsupported Metric Type: %s", metric_type) - - # Now process each group separately to create one EMF log per group - for (_, timestamp_ms), metric_records in grouped_metrics.items(): - if not metric_records: - continue - - # Create and send EMF log for this batch of metrics - self._send_log_event( - { - "message": json.dumps( - self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) - ), - "timestamp": timestamp_ms, - } - ) - - return MetricExportResult.SUCCESS - # pylint: disable=broad-exception-caught - # capture all types of exceptions to not interrupt the instrumented services - except Exception as error: - logger.error("Failed to export metrics: %s", error) - return MetricExportResult.FAILURE - def force_flush(self, timeout_millis: int = 10000) -> bool: # pylint: disable=unused-argument """ Force flush any pending metrics. diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py new file mode 100644 index 000000000..48cb5481a --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/base_emf_exporter.py @@ -0,0 +1,596 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=no-self-use + +import json +import logging +import math +import time +from abc import ABC, abstractmethod +from collections import defaultdict +from typing import Any, Dict, List, Optional, Tuple + +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics import Histogram as HistogramInstr +from opentelemetry.sdk.metrics import ObservableCounter, ObservableGauge, ObservableUpDownCounter, UpDownCounter +from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + ExponentialHistogram, + Gauge, + Histogram, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, + Sum, +) +from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation +from opentelemetry.sdk.resources import Resource +from opentelemetry.util.types import Attributes + +logger = logging.getLogger(__name__) + + +class MetricRecord: + """The metric data unified representation of all OTel metrics for OTel to CW EMF conversion.""" + + def __init__(self, metric_name: str, metric_unit: str, metric_description: str): + """ + Initialize metric record. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + """ + # Instrument metadata + self.name = metric_name + self.unit = metric_unit + self.description = metric_description + + # Will be set by conversion methods + self.timestamp: Optional[int] = None + self.attributes: Attributes = {} + + # Different metric type data - only one will be set per record + self.value: Optional[float] = None + self.sum_data: Optional[Any] = None + self.histogram_data: Optional[Any] = None + self.exp_histogram_data: Optional[Any] = None + + +class BaseEmfExporter(MetricExporter, ABC): + """ + Base class for OpenTelemetry metrics exporters that convert to CloudWatch EMF format. + + This class contains all the common logic for converting OTel metrics into CloudWatch EMF logs. + Subclasses need to implement the _export method to define where the EMF logs are sent. + + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + """ + + # CloudWatch EMF supported units + # Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + EMF_SUPPORTED_UNITS = { + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + "None", + } + + # OTel to CloudWatch unit mapping + # Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 + UNIT_MAPPING = { + "1": "", + "ns": "", + "ms": "Milliseconds", + "s": "Seconds", + "us": "Microseconds", + "By": "Bytes", + "bit": "Bits", + } + + def __init__( + self, + namespace: str = "default", + preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, + preferred_aggregation: Optional[Dict[type, Any]] = None, + ): + """ + Initialize the base EMF exporter. + + Args: + namespace: CloudWatch namespace for metrics + preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality + preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation + """ + # Set up temporality preference default to DELTA if customers not set + if preferred_temporality is None: + preferred_temporality = { + Counter: AggregationTemporality.DELTA, + HistogramInstr: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.DELTA, + } + + # Set up aggregation preference default to exponential histogram for histogram metrics + if preferred_aggregation is None: + preferred_aggregation = { + HistogramInstr: ExponentialBucketHistogramAggregation(), + } + + super().__init__(preferred_temporality, preferred_aggregation) + + self.namespace = namespace + + def _get_metric_name(self, record: MetricRecord) -> Optional[str]: + """Get the metric name from the metric record or data point.""" + + try: + if record.name: + return record.name + except AttributeError: + pass + # Return None if no valid metric name found + return None + + def _get_unit(self, record: MetricRecord) -> Optional[str]: + """Get CloudWatch unit from MetricRecord unit.""" + unit = record.unit + + if not unit: + return None + + # First check if unit is already a supported EMF unit + if unit in self.EMF_SUPPORTED_UNITS: + return unit + + # Map from OTel unit to CloudWatch unit + mapped_unit = self.UNIT_MAPPING.get(unit) + + return mapped_unit + + def _get_dimension_names(self, attributes: Attributes) -> List[str]: + """Extract dimension names from attributes.""" + # Implement dimension selection logic + # For now, use all attributes as dimensions + return list(attributes.keys()) + + def _get_attributes_key(self, attributes: Attributes) -> str: + """ + Create a hashable key from attributes for grouping metrics. + + Args: + attributes: The attributes dictionary + + Returns: + A string representation of sorted attributes key-value pairs + """ + # Sort the attributes to ensure consistent keys + sorted_attrs = sorted(attributes.items()) + # Create a string representation of the attributes + return str(sorted_attrs) + + def _normalize_timestamp(self, timestamp_ns: int) -> int: + """ + Normalize a nanosecond timestamp to milliseconds for CloudWatch. + + Args: + timestamp_ns: Timestamp in nanoseconds + + Returns: + Timestamp in milliseconds + """ + # Convert from nanoseconds to milliseconds + return timestamp_ns // 1_000_000 + + def _create_metric_record(self, metric_name: str, metric_unit: str, metric_description: str) -> MetricRecord: + """ + Creates the intermediate metric data structure that standardizes different otel metric representation + and will be used to generate EMF events. The base record + establishes the instrument schema (name/unit/description) that will be populated + with dimensions, timestamps, and values during metric processing. + + Args: + metric_name: Name of the metric + metric_unit: Unit of the metric + metric_description: Description of the metric + + Returns: + A MetricRecord object + """ + return MetricRecord(metric_name, metric_unit, metric_description) + + def _convert_gauge_and_sum(self, metric: Metric, data_point: NumberDataPoint) -> MetricRecord: + """Convert a Gauge or Sum metric datapoint to a metric record. + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and value + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Set the value directly for both Gauge and Sum + record.value = data_point.value + + return record + + def _convert_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """Convert a Histogram metric datapoint to a metric record. + + https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/datapoint.go#L87 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and histogram_data + """ + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # For Histogram, set the histogram_data + record.histogram_data = { + "Count": data_point.count, + "Sum": data_point.sum, + "Min": data_point.min, + "Max": data_point.max, + } + return record + + # pylint: disable=too-many-locals + def _convert_exp_histogram(self, metric: Metric, data_point: Any) -> MetricRecord: + """ + Convert an ExponentialHistogram metric datapoint to a metric record. + + This function follows the logic of CalculateDeltaDatapoints in the Go implementation, + converting exponential buckets to their midpoint values. + + Ref: + https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/22626 + + Args: + metric: The metric object + data_point: The datapoint to convert + + Returns: + MetricRecord with populated timestamp, attributes, and exp_histogram_data + """ + + # Create base record + record = self._create_metric_record(metric.name, metric.unit, metric.description) + + # Set timestamp + timestamp_ms = ( + self._normalize_timestamp(data_point.time_unix_nano) + if data_point.time_unix_nano is not None + else int(time.time() * 1000) + ) + record.timestamp = timestamp_ms + + # Set attributes + record.attributes = data_point.attributes + + # Initialize arrays for values and counts + array_values = [] + array_counts = [] + + # Get scale + scale = data_point.scale + # Calculate base using the formula: 2^(2^(-scale)) + base = math.pow(2, math.pow(2, float(-scale))) + + # Process positive buckets + if data_point.positive and data_point.positive.bucket_counts: + positive_offset = getattr(data_point.positive, "offset", 0) + positive_bucket_counts = data_point.positive.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(positive_bucket_counts): + index = bucket_index + positive_offset + + if bucket_begin == 0: + bucket_begin = math.pow(base, float(index)) + else: + bucket_begin = bucket_end + + bucket_end = math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Process zero bucket + zero_count = getattr(data_point, "zero_count", 0) + if zero_count > 0: + array_values.append(0) + array_counts.append(float(zero_count)) + + # Process negative buckets + if data_point.negative and data_point.negative.bucket_counts: + negative_offset = getattr(data_point.negative, "offset", 0) + negative_bucket_counts = data_point.negative.bucket_counts + + bucket_begin = 0 + bucket_end = 0 + + for bucket_index, count in enumerate(negative_bucket_counts): + index = bucket_index + negative_offset + + if bucket_end == 0: + bucket_end = -math.pow(base, float(index)) + else: + bucket_end = bucket_begin + + bucket_begin = -math.pow(base, float(index + 1)) + + # Calculate midpoint value of the bucket + metric_val = (bucket_begin + bucket_end) / 2 + + # Only include buckets with positive counts + if count > 0: + array_values.append(metric_val) + array_counts.append(float(count)) + + # Set the histogram data in the format expected by CloudWatch EMF + record.exp_histogram_data = { + "Values": array_values, + "Counts": array_counts, + "Count": data_point.count, + "Sum": data_point.sum, + "Max": data_point.max, + "Min": data_point.min, + } + + return record + + def _group_by_attributes_and_timestamp(self, record: MetricRecord) -> Tuple[str, int]: + """Group metric record by attributes and timestamp. + + Args: + record: The metric record + + Returns: + A tuple key for grouping + """ + # Create a key for grouping based on attributes + attrs_key = self._get_attributes_key(record.attributes) + return (attrs_key, record.timestamp) + + def _create_emf_log( + self, metric_records: List[MetricRecord], resource: Resource, timestamp: Optional[int] = None + ) -> Dict: + """ + Create EMF log dictionary from metric records. + + Since metric_records is already grouped by attributes, this function + creates a single EMF log for all records. + """ + # Start with base structure + emf_log = {"_aws": {"Timestamp": timestamp or int(time.time() * 1000), "CloudWatchMetrics": []}} + + # Set with latest EMF version schema + # opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/metric_translator.go#L414 + emf_log["Version"] = "1" + + # Add resource attributes to EMF log but not as dimensions + # OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert resource attributes + # as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, + # we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. + # And have resource attributes as just additional metadata in EMF, added otel.resource as prefix to distinguish. + if resource and resource.attributes: + for key, value in resource.attributes.items(): + emf_log[f"otel.resource.{key}"] = str(value) + + # Initialize collections for dimensions and metrics + metric_definitions = [] + # Collect attributes from all records (they should be the same for all records in the group) + # Only collect once from the first record and apply to all records + all_attributes = ( + metric_records[0].attributes + if metric_records and len(metric_records) > 0 and metric_records[0].attributes + else {} + ) + + # Process each metric record + for record in metric_records: + + metric_name = self._get_metric_name(record) + + # Skip processing if metric name is None or empty + if not metric_name: + continue + + # Create metric data dict + metric_data = {"Name": metric_name} + + unit = self._get_unit(record) + if unit: + metric_data["Unit"] = unit + + # Process different types of aggregations + if record.exp_histogram_data: + # Base2 Exponential Histogram + emf_log[metric_name] = record.exp_histogram_data + elif record.histogram_data: + # Regular Histogram metrics + emf_log[metric_name] = record.histogram_data + elif record.value is not None: + # Gauge, Sum, and other aggregations + emf_log[metric_name] = record.value + else: + logger.debug("Skipping metric %s as it does not have valid metric value", metric_name) + continue + + # Add to metric definitions list + metric_definitions.append(metric_data) + + # Get dimension names from collected attributes + dimension_names = self._get_dimension_names(all_attributes) + + # Add attribute values to the root of the EMF log + for name, value in all_attributes.items(): + emf_log[name] = str(value) + + # Add CloudWatch Metrics if we have metrics, include dimensions only if they exist + if metric_definitions: + cloudwatch_metric = {"Namespace": self.namespace, "Metrics": metric_definitions} + if dimension_names: + cloudwatch_metric["Dimensions"] = [dimension_names] + + emf_log["_aws"]["CloudWatchMetrics"].append(cloudwatch_metric) + + return emf_log + + @abstractmethod + def _export(self, log_event: Dict[str, Any]): + """ + Send a log event to the destination (CloudWatch Logs, console, etc.). + + This method must be implemented by subclasses to define where the EMF logs are sent. + + Args: + log_event: The log event to send + """ + + # pylint: disable=too-many-nested-blocks,unused-argument,too-many-branches + def export( + self, metrics_data: MetricsData, timeout_millis: Optional[int] = None, **_kwargs: Any + ) -> MetricExportResult: + """ + Export metrics as EMF logs. + + Groups metrics by attributes and timestamp before creating EMF logs. + + Args: + metrics_data: MetricsData containing resource metrics and scope metrics + timeout_millis: Optional timeout in milliseconds + **kwargs: Additional keyword arguments + + Returns: + MetricExportResult indicating success or failure + """ + try: + if not metrics_data.resource_metrics: + return MetricExportResult.SUCCESS + + # Process all metrics from all resource metrics and scope metrics + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + # Dictionary to group metrics by attributes and timestamp + grouped_metrics = defaultdict(list) + + # Process all metrics in this scope + for metric in scope_metrics.metrics: + # Skip if metric.data is None or no data_points exists + try: + if not (metric.data and metric.data.data_points): + continue + except AttributeError: + # Metric doesn't have data or data_points attribute + continue + + # Process metrics based on type + metric_type = type(metric.data) + if metric_type in (Gauge, Sum): + for dp in metric.data.data_points: + record = self._convert_gauge_and_sum(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == Histogram: + for dp in metric.data.data_points: + record = self._convert_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + elif metric_type == ExponentialHistogram: + for dp in metric.data.data_points: + record = self._convert_exp_histogram(metric, dp) + grouped_metrics[self._group_by_attributes_and_timestamp(record)].append(record) + else: + logger.debug("Unsupported Metric Type: %s", metric_type) + + # Now process each group separately to create one EMF log per group + for (_, timestamp_ms), metric_records in grouped_metrics.items(): + if not metric_records: + continue + + # Create and send EMF log for this batch of metrics + logger.debug( + "Creating EMF log for %d records at timestamp %s", + len(metric_records), + timestamp_ms, + ) + self._export( + { + "message": json.dumps( + self._create_emf_log(metric_records, resource_metrics.resource, timestamp_ms) + ), + "timestamp": timestamp_ms, + } + ) + + return MetricExportResult.SUCCESS + # pylint: disable=broad-exception-caught + # capture all types of exceptions to not interrupt the instrumented services + except Exception as error: + logger.error("Failed to export metrics: %s", error) + return MetricExportResult.FAILURE diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py new file mode 100644 index 000000000..8563c16b2 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/console_emf_exporter.py @@ -0,0 +1,104 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=no-self-use + +import logging +from typing import Any, Dict, Optional + +from opentelemetry.sdk.metrics.export import AggregationTemporality + +from .base_emf_exporter import BaseEmfExporter + +logger = logging.getLogger(__name__) + + +class ConsoleEmfExporter(BaseEmfExporter): + """ + OpenTelemetry metrics exporter for CloudWatch EMF format to console output. + + This exporter converts OTel metrics into CloudWatch EMF logs and writes them + to standard output instead of sending to CloudWatch Logs. This is useful for + debugging, testing, or when you want to process EMF logs with other tools. + + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + """ + + def __init__( + self, + namespace: str = "default", + preferred_temporality: Optional[Dict[type, AggregationTemporality]] = None, + preferred_aggregation: Optional[Dict[type, Any]] = None, + **kwargs: Any, + ) -> None: + """ + Initialize the Console EMF exporter. + + Args: + namespace: CloudWatch namespace for metrics (defaults to "default") + preferred_temporality: Optional dictionary mapping instrument types to aggregation temporality + preferred_aggregation: Optional dictionary mapping instrument types to preferred aggregation + **kwargs: Additional arguments (unused, kept for compatibility) + """ + # No need to check for None since namespace has a default value + if namespace is None: + namespace = "default" + super().__init__(namespace, preferred_temporality, preferred_aggregation) + + def _export(self, log_event: Dict[str, Any]) -> None: + """ + Send a log event message to stdout for console output. + + This method writes the EMF log message to stdout, making it easy to + capture and redirect the output for processing or debugging purposes. + + Args: + log_event: The log event dictionary containing 'message' and 'timestamp' + keys, where 'message' is the JSON-serialized EMF log + + Raises: + No exceptions are raised - errors are logged and handled gracefully + """ + try: + # Write the EMF log message to stdout for easy redirection/capture + message = log_event.get("message", "") + if message: + print(message, flush=True) + else: + logger.warning("Empty message in log event: %s", log_event) + except Exception as error: # pylint: disable=broad-exception-caught + logger.error("Failed to write EMF log to console. Log event: %s. Error: %s", log_event, error) + + def force_flush(self, timeout_millis: int = 10000) -> bool: + """ + Force flush any pending metrics. + + For console output, there's no buffering since we use print() with + flush=True, so this is effectively a no-op that always succeeds. + + Args: + timeout_millis: Timeout in milliseconds (unused for console output) + + Returns: + Always returns True as console output is immediately flushed + """ + logger.debug("ConsoleEmfExporter force_flush called - no buffering to flush for console output") + return True + + def shutdown(self, timeout_millis: Optional[int] = None, **kwargs: Any) -> bool: + """ + Shutdown the exporter. + + For console output, there are no resources to clean up or connections + to close, so this is effectively a no-op that always succeeds. + + Args: + timeout_millis: Timeout in milliseconds (unused for console output) + **kwargs: Additional keyword arguments (unused for console output) + + Returns: + Always returns True as there's no cleanup required for console output + """ + logger.debug("ConsoleEmfExporter shutdown called with timeout_millis=%s", timeout_millis) + return True diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py index d6a5903af..c8595cdb7 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py @@ -635,8 +635,8 @@ def test_shutdown(self, mock_force_flush): mock_force_flush.assert_called_once_with(5000) # pylint: disable=broad-exception-caught - def test_send_log_event_method_exists(self): - """Test that _send_log_event method exists and can be called.""" + def test_export_method_exists(self): + """Test that _export method exists and can be called.""" # Just test that the method exists and doesn't crash with basic input log_event = {"message": "test message", "timestamp": 1234567890} @@ -644,10 +644,10 @@ def test_send_log_event_method_exists(self): with patch.object(self.exporter.log_client, "send_log_event") as mock_send: # Should not raise an exception try: - self.exporter._send_log_event(log_event) + self.exporter._export(log_event) mock_send.assert_called_once_with(log_event) except Exception as error: - self.fail(f"_send_log_event raised an exception: {error}") + self.fail(f"_export raised an exception: {error}") if __name__ == "__main__": diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py new file mode 100644 index 000000000..fa0ace40a --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_base_emf_exporter.py @@ -0,0 +1,291 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import unittest +from unittest.mock import Mock + +from amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter import BaseEmfExporter, MetricRecord +from opentelemetry.sdk.metrics.export import MetricExportResult +from opentelemetry.sdk.resources import Resource + + +class ConcreteEmfExporter(BaseEmfExporter): + """Concrete implementation of BaseEmfExporter for testing.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.exported_logs = [] + + def _export(self, log_event): + """Implementation that stores exported logs for testing.""" + self.exported_logs.append(log_event) + + def force_flush(self, timeout_millis=None): # pylint: disable=no-self-use + """Force flush implementation for testing.""" + return True + + def shutdown(self, timeout_millis=None): # pylint: disable=no-self-use + """Shutdown implementation for testing.""" + return True + + +class TestMetricRecord(unittest.TestCase): + """Test MetricRecord class.""" + + def test_metric_record_initialization(self): + """Test MetricRecord initialization with basic values.""" + record = MetricRecord("test_metric", "Count", "Test description") + + self.assertEqual(record.name, "test_metric") + self.assertEqual(record.unit, "Count") + self.assertEqual(record.description, "Test description") + self.assertIsNone(record.timestamp) + self.assertEqual(record.attributes, {}) + self.assertIsNone(record.value) + self.assertIsNone(record.sum_data) + self.assertIsNone(record.histogram_data) + self.assertIsNone(record.exp_histogram_data) + + +class TestBaseEmfExporter(unittest.TestCase): + """Test BaseEmfExporter class.""" + + def setUp(self): + """Set up test fixtures.""" + self.exporter = ConcreteEmfExporter(namespace="TestNamespace") + + def test_initialization(self): + """Test exporter initialization.""" + exporter = ConcreteEmfExporter() + self.assertEqual(exporter.namespace, "default") + + exporter_custom = ConcreteEmfExporter(namespace="CustomNamespace") + self.assertEqual(exporter_custom.namespace, "CustomNamespace") + + def test_get_metric_name(self): + """Test metric name extraction.""" + # Test with valid name + record = Mock() + record.name = "test_metric" + result = self.exporter._get_metric_name(record) + self.assertEqual(result, "test_metric") + + # Test with empty name + record.name = "" + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + # Test with None name + record.name = None + result = self.exporter._get_metric_name(record) + self.assertIsNone(result) + + def test_get_unit(self): + """Test unit mapping functionality.""" + # Test EMF supported units (should return as-is) + record = MetricRecord("test", "Count", "desc") + self.assertEqual(self.exporter._get_unit(record), "Count") + + record = MetricRecord("test", "Percent", "desc") + self.assertEqual(self.exporter._get_unit(record), "Percent") + + # Test OTel unit mapping + record = MetricRecord("test", "ms", "desc") + self.assertEqual(self.exporter._get_unit(record), "Milliseconds") + + record = MetricRecord("test", "s", "desc") + self.assertEqual(self.exporter._get_unit(record), "Seconds") + + record = MetricRecord("test", "By", "desc") + self.assertEqual(self.exporter._get_unit(record), "Bytes") + + # Test units that map to empty string + record = MetricRecord("test", "1", "desc") + self.assertEqual(self.exporter._get_unit(record), "") + + # Test unknown unit + record = MetricRecord("test", "unknown", "desc") + self.assertIsNone(self.exporter._get_unit(record)) + + # Test None unit + record = MetricRecord("test", None, "desc") + self.assertIsNone(self.exporter._get_unit(record)) + + def test_get_dimension_names(self): + """Test dimension names extraction.""" + attributes = {"service": "test", "env": "prod"} + result = self.exporter._get_dimension_names(attributes) + self.assertEqual(set(result), {"service", "env"}) + + # Test empty attributes + result = self.exporter._get_dimension_names({}) + self.assertEqual(result, []) + + def test_get_attributes_key(self): + """Test attributes key generation.""" + attrs1 = {"b": "2", "a": "1"} + attrs2 = {"a": "1", "b": "2"} + + key1 = self.exporter._get_attributes_key(attrs1) + key2 = self.exporter._get_attributes_key(attrs2) + + # Keys should be consistent regardless of order + self.assertEqual(key1, key2) + self.assertIsInstance(key1, str) + + def test_normalize_timestamp(self): + """Test timestamp normalization.""" + timestamp_ns = 1609459200000000000 # nanoseconds + expected_ms = 1609459200000 # milliseconds + + result = self.exporter._normalize_timestamp(timestamp_ns) + self.assertEqual(result, expected_ms) + + def test_create_metric_record(self): + """Test metric record creation.""" + record = self.exporter._create_metric_record("test_metric", "Count", "Description") + + self.assertIsInstance(record, MetricRecord) + self.assertEqual(record.name, "test_metric") + self.assertEqual(record.unit, "Count") + self.assertEqual(record.description, "Description") + + def test_convert_gauge_and_sum(self): + """Test gauge and sum conversion.""" + metric = Mock() + metric.name = "test_gauge" + metric.unit = "Count" + metric.description = "Test gauge" + + data_point = Mock() + data_point.value = 42.0 + data_point.attributes = {"key": "value"} + data_point.time_unix_nano = 1609459200000000000 + + record = self.exporter._convert_gauge_and_sum(metric, data_point) + + self.assertEqual(record.name, "test_gauge") + self.assertEqual(record.value, 42.0) + self.assertEqual(record.attributes, {"key": "value"}) + self.assertEqual(record.timestamp, 1609459200000) + + def test_convert_histogram(self): + """Test histogram conversion.""" + metric = Mock() + metric.name = "test_histogram" + metric.unit = "ms" + metric.description = "Test histogram" + + data_point = Mock() + data_point.count = 5 + data_point.sum = 25.0 + data_point.min = 1.0 + data_point.max = 10.0 + data_point.attributes = {"service": "test"} + data_point.time_unix_nano = 1609459200000000000 + + record = self.exporter._convert_histogram(metric, data_point) + + self.assertEqual(record.name, "test_histogram") + expected_data = {"Count": 5, "Sum": 25.0, "Min": 1.0, "Max": 10.0} + self.assertEqual(record.histogram_data, expected_data) + self.assertEqual(record.attributes, {"service": "test"}) + + def test_convert_exp_histogram(self): + """Test exponential histogram conversion.""" + metric = Mock() + metric.name = "test_exp_histogram" + metric.unit = "s" + metric.description = "Test exponential histogram" + + data_point = Mock() + data_point.count = 10 + data_point.sum = 50.0 + data_point.min = 1.0 + data_point.max = 20.0 + data_point.scale = 1 + data_point.zero_count = 0 + data_point.attributes = {"env": "test"} + data_point.time_unix_nano = 1609459200000000000 + + # Mock buckets + data_point.positive = Mock() + data_point.positive.offset = 0 + data_point.positive.bucket_counts = [1, 2, 1] + + data_point.negative = Mock() + data_point.negative.offset = 0 + data_point.negative.bucket_counts = [] + + record = self.exporter._convert_exp_histogram(metric, data_point) + + self.assertEqual(record.name, "test_exp_histogram") + self.assertIsNotNone(record.exp_histogram_data) + self.assertIn("Values", record.exp_histogram_data) + self.assertIn("Counts", record.exp_histogram_data) + self.assertEqual(record.exp_histogram_data["Count"], 10) + self.assertEqual(record.exp_histogram_data["Sum"], 50.0) + + def test_group_by_attributes_and_timestamp(self): + """Test grouping by attributes and timestamp.""" + record = Mock() + record.attributes = {"env": "test"} + record.timestamp = 1234567890 + + result = self.exporter._group_by_attributes_and_timestamp(record) + + self.assertIsInstance(result, tuple) + self.assertEqual(len(result), 2) + self.assertEqual(result[1], 1234567890) + + def test_create_emf_log(self): + """Test EMF log creation.""" + # Create a simple metric record + record = self.exporter._create_metric_record("test_metric", "Count", "Test") + record.value = 50.0 + record.timestamp = 1234567890 + record.attributes = {"env": "test"} + + records = [record] + resource = Resource.create({"service.name": "test-service"}) + + result = self.exporter._create_emf_log(records, resource, 1234567890) + + # Check basic EMF structure + self.assertIn("_aws", result) + self.assertIn("CloudWatchMetrics", result["_aws"]) + self.assertEqual(result["_aws"]["Timestamp"], 1234567890) + self.assertEqual(result["Version"], "1") + + # Check metric value + self.assertEqual(result["test_metric"], 50.0) + + # Check resource attributes + self.assertEqual(result["otel.resource.service.name"], "test-service") + + # Check CloudWatch metrics + cw_metrics = result["_aws"]["CloudWatchMetrics"][0] + self.assertEqual(cw_metrics["Namespace"], "TestNamespace") + self.assertEqual(cw_metrics["Metrics"][0]["Name"], "test_metric") + + def test_export_empty_metrics(self): + """Test export with empty metrics data.""" + metrics_data = Mock() + metrics_data.resource_metrics = [] + + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_failure_handling(self): + """Test export failure handling.""" + metrics_data = Mock() + # Make iteration fail + metrics_data.resource_metrics = Mock() + metrics_data.resource_metrics.__iter__ = Mock(side_effect=Exception("Test exception")) + + result = self.exporter.export(metrics_data) + self.assertEqual(result, MetricExportResult.FAILURE) + + +if __name__ == "__main__": + unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py new file mode 100644 index 000000000..8783dfd92 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_console_emf_exporter.py @@ -0,0 +1,288 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import unittest +from io import StringIO +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter import ConsoleEmfExporter +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricExportResult, MetricsData + + +class TestConsoleEmfExporter(unittest.TestCase): + """Test ConsoleEmfExporter class.""" + + def setUp(self): + """Set up test fixtures.""" + self.exporter = ConsoleEmfExporter() + + def test_namespace_initialization(self): + """Test exporter initialization with different namespace scenarios.""" + # Test default namespace + exporter = ConsoleEmfExporter() + self.assertEqual(exporter.namespace, "default") + + # Test custom namespace + exporter = ConsoleEmfExporter(namespace="CustomNamespace") + self.assertEqual(exporter.namespace, "CustomNamespace") + + # Test None namespace (should default to 'default') + exporter = ConsoleEmfExporter(namespace=None) + self.assertEqual(exporter.namespace, "default") + + # Test empty string namespace (should remain empty) + exporter = ConsoleEmfExporter(namespace="") + self.assertEqual(exporter.namespace, "") + + def test_initialization_with_parameters(self): + """Test exporter initialization with optional parameters.""" + # Test with preferred_temporality + preferred_temporality = {Counter: AggregationTemporality.CUMULATIVE} + exporter = ConsoleEmfExporter(namespace="TestNamespace", preferred_temporality=preferred_temporality) + self.assertEqual(exporter.namespace, "TestNamespace") + self.assertEqual(exporter._preferred_temporality[Counter], AggregationTemporality.CUMULATIVE) + + # Test with preferred_aggregation + preferred_aggregation = {Counter: "TestAggregation"} + exporter = ConsoleEmfExporter(preferred_aggregation=preferred_aggregation) + self.assertEqual(exporter._preferred_aggregation[Counter], "TestAggregation") + + # Test with additional kwargs + exporter = ConsoleEmfExporter(namespace="TestNamespace", extra_param="ignored") # Should be ignored + self.assertEqual(exporter.namespace, "TestNamespace") + + def test_export_log_event_success(self): + """Test that log events are properly sent to console output.""" + # Create a simple log event with EMF-formatted message + test_message = ( + '{"_aws":{"Timestamp":1640995200000,"CloudWatchMetrics":[{"Namespace":"TestNamespace",' + '"Dimensions":[["Service"]],"Metrics":[{"Name":"TestMetric","Unit":"Count"}]}]},' + '"Service":"test-service","TestMetric":42}' + ) + log_event = {"message": test_message, "timestamp": 1640995200000} + + # Capture stdout to verify the output + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + self.exporter._export(log_event) + + # Verify the message was printed to stdout with flush + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, test_message) + + def test_export_log_event_empty_message(self): + """Test handling of log events with empty messages.""" + log_event = {"message": "", "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should not print anything for empty message + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, "") + + # Should log a warning + mock_logger.warning.assert_called_once() + + def test_export_log_event_missing_message(self): + """Test handling of log events without message key.""" + log_event = {"timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should not print anything when message is missing + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, "") + + # Should log a warning + mock_logger.warning.assert_called_once() + + def test_export_log_event_with_none_message(self): + """Test handling of log events with None message.""" + log_event = {"message": None, "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should not print anything when message is None + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, "") + + # Should log a warning + mock_logger.warning.assert_called_once() + + def test_export_log_event_print_exception(self): + """Test error handling when print() raises an exception.""" + log_event = {"message": "test message", "timestamp": 1640995200000} + + with patch("builtins.print", side_effect=Exception("Print failed")): + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export(log_event) + + # Should log the error + mock_logger.error.assert_called_once() + args = mock_logger.error.call_args[0] + self.assertIn("Failed to write EMF log to console", args[0]) + self.assertEqual(args[1], log_event) + self.assertIn("Print failed", str(args[2])) + + def test_export_log_event_various_message_types(self): + """Test _export with various message types.""" + # Test with JSON string + json_message = '{"key": "value"}' + log_event = {"message": json_message, "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + self.exporter._export(log_event) + + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, json_message) + + # Test with plain string + plain_message = "Simple log message" + log_event = {"message": plain_message, "timestamp": 1640995200000} + + with patch("sys.stdout", new_callable=StringIO) as mock_stdout: + self.exporter._export(log_event) + + captured_output = mock_stdout.getvalue().strip() + self.assertEqual(captured_output, plain_message) + + def test_force_flush(self): + """Test force_flush method.""" + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + # Test with default timeout + result = self.exporter.force_flush() + self.assertTrue(result) + mock_logger.debug.assert_called_once() + + # Reset mock for next call + mock_logger.reset_mock() + + # Test with custom timeout + result = self.exporter.force_flush(timeout_millis=5000) + self.assertTrue(result) + mock_logger.debug.assert_called_once() + + def test_shutdown(self): + """Test shutdown method.""" + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + # Test with no timeout + result = self.exporter.shutdown() + self.assertTrue(result) + mock_logger.debug.assert_called_once_with("ConsoleEmfExporter shutdown called with timeout_millis=%s", None) + + # Reset mock for next call + mock_logger.reset_mock() + + # Test with timeout + result = self.exporter.shutdown(timeout_millis=3000) + self.assertTrue(result) + mock_logger.debug.assert_called_once_with("ConsoleEmfExporter shutdown called with timeout_millis=%s", 3000) + + # Reset mock for next call + mock_logger.reset_mock() + + # Test with additional kwargs + result = self.exporter.shutdown(timeout_millis=3000, extra_arg="ignored") + self.assertTrue(result) + mock_logger.debug.assert_called_once() + + def test_integration_with_metrics_data(self): + """Test the full export flow with actual MetricsData.""" + # Create a mock MetricsData + mock_metrics_data = MagicMock(spec=MetricsData) + mock_resource_metrics = MagicMock() + mock_scope_metrics = MagicMock() + mock_metric = MagicMock() + + # Set up the mock hierarchy + mock_metrics_data.resource_metrics = [mock_resource_metrics] + mock_resource_metrics.scope_metrics = [mock_scope_metrics] + mock_scope_metrics.metrics = [mock_metric] + + # Mock the metric to have no data_points to avoid complex setup + mock_metric.data = None + + with patch("sys.stdout", new_callable=StringIO): + result = self.exporter.export(mock_metrics_data) + + # Should succeed even with no actual metrics + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_integration_export_success(self): + """Test export method returns success.""" + # Create empty MetricsData + mock_metrics_data = MagicMock(spec=MetricsData) + mock_metrics_data.resource_metrics = [] + + result = self.exporter.export(mock_metrics_data) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_integration_export_with_timeout(self): + """Test export method with timeout parameter.""" + mock_metrics_data = MagicMock(spec=MetricsData) + mock_metrics_data.resource_metrics = [] + + result = self.exporter.export(mock_metrics_data, timeout_millis=5000) + self.assertEqual(result, MetricExportResult.SUCCESS) + + def test_export_failure_handling(self): + """Test export method handles exceptions and returns failure.""" + # Create a mock that raises an exception + mock_metrics_data = MagicMock(spec=MetricsData) + mock_metrics_data.resource_metrics = [MagicMock()] + + # Make the resource_metrics access raise an exception + type(mock_metrics_data).resource_metrics = property( + lambda self: (_ for _ in ()).throw(Exception("Test exception")) + ) + + # Patch the logger in the base_emf_exporter since that's where the error logging happens + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.base_emf_exporter.logger") as mock_logger: + result = self.exporter.export(mock_metrics_data) + + self.assertEqual(result, MetricExportResult.FAILURE) + mock_logger.error.assert_called_once() + self.assertIn("Failed to export metrics", mock_logger.error.call_args[0][0]) + + def test_flush_output_verification(self): + """Test that print is called with flush=True.""" + log_event = {"message": "test message", "timestamp": 1640995200000} + + with patch("builtins.print") as mock_print: + self.exporter._export(log_event) + + # Verify print was called with flush=True + mock_print.assert_called_once_with("test message", flush=True) + + def test_logger_levels(self): + """Test that appropriate log levels are used.""" + # Test debug logging in force_flush + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter.force_flush() + mock_logger.debug.assert_called_once() + + # Test debug logging in shutdown + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter.shutdown() + mock_logger.debug.assert_called_once() + + # Test warning logging for empty message + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export({"message": "", "timestamp": 123}) + mock_logger.warning.assert_called_once() + + # Test error logging for exception + with patch("builtins.print", side_effect=Exception("Test")): + with patch("amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.logger") as mock_logger: + self.exporter._export({"message": "test", "timestamp": 123}) + mock_logger.error.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 9e4afc81e..31439eba3 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -25,6 +25,7 @@ AwsOpenTelemetryConfigurator, OtlpLogHeaderSetting, _check_emf_exporter_enabled, + _clear_logs_header_cache, _create_aws_otlp_exporter, _create_emf_exporter, _custom_import_sampler, @@ -37,16 +38,15 @@ _customize_span_processors, _export_unsampled_span_for_agent_observability, _export_unsampled_span_for_lambda, + _fetch_logs_header, _init_logging, _is_application_signals_enabled, _is_application_signals_runtime_enabled, _is_defer_to_workers_enabled, _is_wsgi_master_process, - _validate_and_fetch_logs_header, ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor -from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import AwsCloudWatchEmfExporter from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession # pylint: disable=line-too-long @@ -508,6 +508,7 @@ def test_customize_span_exporter_sigv4(self): bad_configs.append(config) for config in good_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_span_exporter, @@ -519,6 +520,7 @@ def test_customize_span_exporter_sigv4(self): ) for config in bad_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_span_exporter, @@ -616,6 +618,7 @@ def test_customize_logs_exporter_sigv4(self): bad_configs.append(config) for config in good_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_logs_exporter, @@ -626,6 +629,7 @@ def test_customize_logs_exporter_sigv4(self): ) for config in bad_configs: + _clear_logs_header_cache() self.customize_exporter_test( config, _customize_logs_exporter, OTLPLogExporter(), OTLPLogExporter, Session, Compression.NoCompression ) @@ -964,64 +968,72 @@ def test_check_emf_exporter_enabled(self): # Clean up os.environ.pop("OTEL_METRICS_EXPORTER", None) - def test_validate_and_fetch_logs_header(self): + def test_fetch_logs_header(self): + _clear_logs_header_cache() + # Test when headers are not set os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertIsInstance(result, OtlpLogHeaderSetting) self.assertIsNone(result.log_group) self.assertIsNone(result.log_stream) self.assertIsNone(result.namespace) - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) + + # Test singleton behavior - should return the same cached instance + result2 = _fetch_logs_header() + self.assertIs(result, result2) # Same object reference - # Test with valid headers + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=test-stream" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertEqual(result.log_stream, "test-stream") self.assertIsNone(result.namespace) - self.assertTrue(result.is_valid) + self.assertTrue(result.is_valid()) - # Test with valid headers including namespace + # Test singleton behavior again + result2 = _fetch_logs_header() + self.assertIs(result, result2) + + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = ( "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace" ) - result = _validate_and_fetch_logs_header() - self.assertEqual(result.log_group, "test-group") - self.assertEqual(result.log_stream, "test-stream") + result = _fetch_logs_header() self.assertEqual(result.namespace, "test-namespace") - self.assertTrue(result.is_valid) + self.assertTrue(result.is_valid()) - # Test with missing log group + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-stream=test-stream" - result = _validate_and_fetch_logs_header() - self.assertIsNone(result.log_group) + result = _fetch_logs_header() self.assertEqual(result.log_stream, "test-stream") - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) - # Test with missing log stream + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertIsNone(result.log_stream) - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) - # Test with empty value in log group + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=,x-aws-log-stream=test-stream" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertIsNone(result.log_group) self.assertEqual(result.log_stream, "test-stream") - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) - # Test with empty value in log stream + _clear_logs_header_cache() os.environ[OTEL_EXPORTER_OTLP_LOGS_HEADERS] = "x-aws-log-group=test-group,x-aws-log-stream=" - result = _validate_and_fetch_logs_header() + result = _fetch_logs_header() self.assertEqual(result.log_group, "test-group") self.assertIsNone(result.log_stream) - self.assertFalse(result.is_valid) + self.assertFalse(result.is_valid()) # Clean up os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) + _clear_logs_header_cache() @patch( "amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled", return_value=False @@ -1051,63 +1063,6 @@ def test_customize_log_record_processor_with_agent_observability(self, _): added_processor = mock_logger_provider.add_log_record_processor.call_args[0][0] self.assertIsInstance(added_processor, AwsCloudWatchOtlpBatchLogRecordProcessor) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") - def test_create_emf_exporter(self, mock_get_session, mock_validate): - # Test when botocore is not installed - mock_get_session.return_value = None - result = _create_emf_exporter() - self.assertIsNone(result) - - # Reset mock for subsequent tests - mock_get_session.reset_mock() - mock_get_session.return_value = MagicMock() - - # Mock the EMF exporter class import by patching the module import - with patch( - "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" - ) as mock_emf_exporter_class: - mock_exporter_instance = MagicMock() - mock_exporter_instance.namespace = "default" - mock_exporter_instance.log_group_name = "test-group" - mock_emf_exporter_class.return_value = mock_exporter_instance - - # Test when headers are invalid - mock_validate.return_value = OtlpLogHeaderSetting(None, None, None, False) - result = _create_emf_exporter() - self.assertIsNone(result) - - # Test when namespace is missing (should still create exporter with default namespace) - mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", None, True) - result = _create_emf_exporter() - self.assertIsNotNone(result) - self.assertEqual(result, mock_exporter_instance) - # Verify that the EMF exporter was called with correct parameters - mock_emf_exporter_class.assert_called_with( - session=mock_get_session.return_value, - namespace=None, - log_group_name="test-group", - log_stream_name="test-stream", - ) - - # Test with valid configuration - mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", "test-namespace", True) - result = _create_emf_exporter() - self.assertIsNotNone(result) - self.assertEqual(result, mock_exporter_instance) - # Verify that the EMF exporter was called with correct parameters - mock_emf_exporter_class.assert_called_with( - session=mock_get_session.return_value, - namespace="test-namespace", - log_group_name="test-group", - log_stream_name="test-stream", - ) - - # Test exception handling - mock_validate.side_effect = Exception("Test exception") - result = _create_emf_exporter() - self.assertIsNone(result) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") @@ -1174,35 +1129,6 @@ def test_create_aws_otlp_exporter(self, mock_get_session, mock_is_agent_enabled, result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") self.assertIsNone(result) - def test_customize_metric_exporters_with_emf(self): - metric_readers = [] - views = [] - - # Test with EMF disabled - _customize_metric_exporters(metric_readers, views, is_emf_enabled=False) - self.assertEqual(len(metric_readers), 0) - - # Test with EMF enabled but create_emf_exporter returns None - with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", return_value=None - ): - _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) - self.assertEqual(len(metric_readers), 0) - - # Test with EMF enabled and valid exporter - mock_emf_exporter = MagicMock(spec=AwsCloudWatchEmfExporter) - # Add the required attributes that PeriodicExportingMetricReader expects - mock_emf_exporter._preferred_temporality = {} - mock_emf_exporter._preferred_aggregation = {} - - with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", - return_value=mock_emf_exporter, - ): - _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) - self.assertEqual(len(metric_readers), 1) - self.assertIsInstance(metric_readers[0], PeriodicExportingMetricReader) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_service_attribute") def test_customize_resource_without_agent_observability(self, mock_get_service_attribute, mock_is_agent_enabled): @@ -1250,6 +1176,202 @@ def test_customize_resource_with_existing_agent_type(self, mock_get_service_attr self.assertEqual(result.attributes[AWS_LOCAL_SERVICE], "test-service") self.assertEqual(result.attributes[AWS_SERVICE_TYPE], "existing-agent") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_lambda_without_valid_headers( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns ConsoleEmfExporter for Lambda without valid log headers""" + # Setup mocks + mock_is_lambda.return_value = True + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = False + mock_header_setting.namespace = "test-namespace" + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.ConsoleEmfExporter" + ) as mock_console_exporter: + mock_exporter_instance = MagicMock() + mock_console_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_console_exporter.assert_called_once_with(namespace="test-namespace") + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_lambda_with_valid_headers(self, mock_get_session, mock_is_lambda, mock_fetch_headers): + """Test _create_emf_exporter returns AwsCloudWatchEmfExporter for Lambda with valid headers""" + # Setup mocks + mock_is_lambda.return_value = True + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_header_setting.namespace = "test-namespace" + mock_header_setting.log_group = "test-group" + mock_header_setting.log_stream = "test-stream" + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" + ) as mock_cloudwatch_exporter: + mock_exporter_instance = MagicMock() + mock_cloudwatch_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_cloudwatch_exporter.assert_called_once_with( + session=mock_session, + namespace="test-namespace", + log_group_name="test-group", + log_stream_name="test-stream", + ) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_non_lambda_with_valid_headers( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns AwsCloudWatchEmfExporter for non-Lambda with valid headers""" + # Setup mocks + mock_is_lambda.return_value = False + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_header_setting.namespace = "test-namespace" + mock_header_setting.log_group = "test-group" + mock_header_setting.log_stream = "test-stream" + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter.AwsCloudWatchEmfExporter" + ) as mock_cloudwatch_exporter: + mock_exporter_instance = MagicMock() + mock_cloudwatch_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_cloudwatch_exporter.assert_called_once_with( + session=mock_session, + namespace="test-namespace", + log_group_name="test-group", + log_stream_name="test-stream", + ) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_non_lambda_without_valid_headers( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns None for non-Lambda without valid headers""" + # Setup mocks + mock_is_lambda.return_value = False + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = False + mock_fetch_headers.return_value = mock_header_setting + + result = _create_emf_exporter() + + self.assertIsNone(result) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") + def test_create_emf_exporter_no_botocore_session( + self, mock_logger, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter returns None when botocore session is not available""" + # Setup mocks + mock_is_lambda.return_value = False + mock_get_session.return_value = None # Simulate missing botocore + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_fetch_headers.return_value = mock_header_setting + + result = _create_emf_exporter() + + self.assertIsNone(result) + mock_logger.warning.assert_called_once_with("botocore is not installed. EMF exporter requires botocore") + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") + def test_create_emf_exporter_exception_handling(self, mock_logger, mock_fetch_headers): + """Test _create_emf_exporter handles exceptions gracefully""" + # Setup mocks to raise exception + test_exception = Exception("Test exception") + mock_fetch_headers.side_effect = test_exception + + result = _create_emf_exporter() + + self.assertIsNone(result) + mock_logger.error.assert_called_once_with("Failed to create EMF exporter: %s", test_exception) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_lambda_without_valid_headers_none_namespace( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter with Lambda environment and None namespace""" + # Setup mocks + mock_is_lambda.return_value = True + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = False + mock_header_setting.namespace = None + mock_fetch_headers.return_value = mock_header_setting + + with patch( + "amazon.opentelemetry.distro.exporter.aws.metrics.console_emf_exporter.ConsoleEmfExporter" + ) as mock_console_exporter: + mock_exporter_instance = MagicMock() + mock_console_exporter.return_value = mock_exporter_instance + + result = _create_emf_exporter() + + self.assertEqual(result, mock_exporter_instance) + mock_console_exporter.assert_called_once_with(namespace=None) + + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._fetch_logs_header") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_lambda_environment") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter_cloudwatch_exporter_import_error( + self, mock_get_session, mock_is_lambda, mock_fetch_headers + ): + """Test _create_emf_exporter handles import errors for CloudWatch exporter""" + # Setup mocks + mock_is_lambda.return_value = False + mock_session = MagicMock() + mock_get_session.return_value = mock_session + + mock_header_setting = MagicMock() + mock_header_setting.is_valid.return_value = True + mock_fetch_headers.return_value = mock_header_setting + + # Mock import to raise ImportError + with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._logger") as mock_logger: + with patch("builtins.__import__", side_effect=ImportError("Cannot import CloudWatch exporter")): + result = _create_emf_exporter() + + self.assertIsNone(result) + mock_logger.error.assert_called_once() + def validate_distro_environ(): tc: TestCase = TestCase() diff --git a/lambda-layer/src/otel-instrument b/lambda-layer/src/otel-instrument index 6965ba12d..0fb750f34 100755 --- a/lambda-layer/src/otel-instrument +++ b/lambda-layer/src/otel-instrument @@ -98,6 +98,19 @@ if [ -z ${OTEL_PROPAGATORS} ]; then export OTEL_PROPAGATORS="baggage,xray,tracecontext" fi +# disable application signals runtime metrics by default +export OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED="false"; + +# enable emf exporter by default +if [ -z "${OTEL_METRICS_EXPORTER}" ]; then + export OTEL_METRICS_EXPORTER="awsemf"; +fi + +# disable OTel logs exporter by default +if [ -z "${OTEL_LOGS_EXPORTER}" ]; then + export OTEL_LOGS_EXPORTER="none"; +fi + if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true"; fi @@ -107,17 +120,6 @@ fi if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "true" ]; then export OTEL_PYTHON_DISTRO="aws_distro"; export OTEL_PYTHON_CONFIGURATOR="aws_configurator"; - if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="none"; - fi -fi - -# - If Application Signals is disabled - -if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "false" ]; then - if [ -z "${OTEL_METRICS_EXPORTER}" ]; then - export OTEL_METRICS_EXPORTER="none"; - fi fi if [ -z "${OTEL_RESOURCE_ATTRIBUTES}" ]; then